diff --git a/client/internal/uploader/create_load.go b/client/internal/uploader/create_load.go index 48df400..e7cf197 100644 --- a/client/internal/uploader/create_load.go +++ b/client/internal/uploader/create_load.go @@ -9,36 +9,33 @@ import ( "time" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage2/client/internal/db" "gitlink.org.cn/cloudream/storage2/client/types" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" ) type CreateLoadUploader struct { - pkg cdssdk.Package - userID cdssdk.UserID + pkg types.Package targetSpaces []types.UserSpaceDetail loadRoots []string uploader *Uploader distlock *distlock.Mutex - successes []coormq.AddObjectEntry + successes []db.AddObjectEntry lock sync.Mutex commited bool } type CreateLoadResult struct { - Package cdssdk.Package - Objects map[string]cdssdk.Object + Package types.Package + Objects map[string]types.Object } func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) error { uploadTime := time.Now() - spaceIDs := make([]cdssdk.StorageID, 0, len(u.targetSpaces)) + spaceIDs := make([]types.UserSpaceID, 0, len(u.targetSpaces)) ft := ioswitch2.FromTo{} fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) @@ -46,7 +43,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err for i, space := range u.targetSpaces { ft.AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "fileHash")) ft.AddTo(ioswitch2.NewLoadToPublic(*space.MasterHub, space, path.Join(u.loadRoots[i], pa))) - spaceIDs = append(spaceIDs, space.Storage.StorageID) + spaceIDs = append(spaceIDs, space.UserSpace.UserSpaceID) } plans := exec.NewPlanBuilder() @@ -69,12 +66,12 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err // 记录上传结果 fileHash := ret["fileHash"].(*ops2.ShardInfoValue).Hash - u.successes = append(u.successes, coormq.AddObjectEntry{ - Path: pa, - Size: size, - FileHash: fileHash, - UploadTime: uploadTime, - StorageIDs: spaceIDs, + u.successes = append(u.successes, db.AddObjectEntry{ + Path: pa, + Size: size, + FileHash: fileHash, + UploadTime: uploadTime, + UserSpaceIDs: spaceIDs, }) return nil } @@ -90,31 +87,25 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { defer u.distlock.Unlock() - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return CreateLoadResult{}, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(u.pkg.PackageID, u.successes)) + var addedObjs []types.Object + err := u.uploader.db.DoTx(func(tx db.SQLContext) error { + var err error + addedObjs, err = u.uploader.db.Object().BatchAdd(tx, u.pkg.PackageID, u.successes) + return err + }) if err != nil { - return CreateLoadResult{}, fmt.Errorf("updating package: %w", err) + return CreateLoadResult{}, fmt.Errorf("adding objects: %w", err) } ret := CreateLoadResult{ Package: u.pkg, - Objects: make(map[string]cdssdk.Object), + Objects: make(map[string]types.Object), } - for _, entry := range updateResp.Added { + for _, entry := range addedObjs { ret.Objects[entry.Path] = entry } - for i, stg := range u.targetSpaces { - // 不关注是否成功 - coorCli.StoragePackageLoaded(coormq.ReqStoragePackageLoaded(u.userID, stg.Storage.StorageID, u.pkg.PackageID, u.loadRoots[i], nil)) - } - return ret, nil } diff --git a/client/internal/uploader/update.go b/client/internal/uploader/update.go index 4d628b7..6986817 100644 --- a/client/internal/uploader/update.go +++ b/client/internal/uploader/update.go @@ -10,37 +10,35 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" ) type UpdateUploader struct { - uploader *Uploader - pkgID cdssdk.PackageID - targetStg stgmod.StorageDetail - distMutex *distlock.Mutex - loadToStgs []stgmod.StorageDetail - loadToPath []string - successes []coormq.AddObjectEntry - lock sync.Mutex - commited bool + uploader *Uploader + pkgID types.PackageID + targetSpace types.UserSpaceDetail + distMutex *distlock.Mutex + loadToSpaces []types.UserSpaceDetail + loadToPath []string + successes []db.AddObjectEntry + lock sync.Mutex + commited bool } -type UploadStorageInfo struct { - Storage stgmod.StorageDetail +type UploadSpaceInfo struct { + Space types.UserSpaceDetail Delay time.Duration IsSameLocation bool } type UpdateResult struct { // 上传成功的文件列表,Key为Path - Objects map[string]cdssdk.Object + Objects map[string]types.Object } func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { @@ -49,10 +47,10 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { ft := ioswitch2.NewFromTo() fromExec, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromExec). - AddTo(ioswitch2.NewToShardStore(*w.targetStg.MasterHub, w.targetStg, ioswitch2.RawStream(), "shardInfo")) + AddTo(ioswitch2.NewToShardStore(*w.targetSpace.MasterHub, w.targetSpace, ioswitch2.RawStream(), "shardInfo")) - for i, stg := range w.loadToStgs { - ft.AddTo(ioswitch2.NewLoadToPublic(*stg.MasterHub, stg, path.Join(w.loadToPath[i], pat))) + for i, space := range w.loadToSpaces { + ft.AddTo(ioswitch2.NewLoadToPublic(*space.MasterHub, space, path.Join(w.loadToPath[i], pat))) } plans := exec.NewPlanBuilder() @@ -75,12 +73,12 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { // 记录上传结果 shardInfo := ret["shardInfo"].(*ops2.ShardInfoValue) - w.successes = append(w.successes, coormq.AddObjectEntry{ - Path: pat, - Size: shardInfo.Size, - FileHash: shardInfo.Hash, - UploadTime: uploadTime, - StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID}, + w.successes = append(w.successes, db.AddObjectEntry{ + Path: pat, + Size: shardInfo.Size, + FileHash: shardInfo.Hash, + UploadTime: uploadTime, + UserSpaceIDs: []types.UserSpaceID{w.targetSpace.UserSpace.UserSpaceID}, }) return nil } @@ -90,7 +88,7 @@ func (w *UpdateUploader) CancelObject(path string) { w.lock.Lock() defer w.lock.Unlock() - w.successes = lo.Reject(w.successes, func(e coormq.AddObjectEntry, i int) bool { + w.successes = lo.Reject(w.successes, func(e db.AddObjectEntry, i int) bool { return e.Path == path }) } @@ -119,22 +117,21 @@ func (w *UpdateUploader) Commit() (UpdateResult, error) { defer w.distMutex.Unlock() - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return UpdateResult{}, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(w.pkgID, w.successes)) + var addedObjs []types.Object + err := w.uploader.db.DoTx(func(tx db.SQLContext) error { + var err error + addedObjs, err = w.uploader.db.Object().BatchAdd(tx, w.pkgID, w.successes) + return err + }) if err != nil { - return UpdateResult{}, fmt.Errorf("updating package: %w", err) + return UpdateResult{}, fmt.Errorf("adding objects: %w", err) } ret := UpdateResult{ - Objects: make(map[string]cdssdk.Object), + Objects: make(map[string]types.Object), } - for _, entry := range updateResp.Added { + for _, entry := range addedObjs { ret.Objects[entry.Path] = entry } diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 239bcc5..d3a5c2f 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -10,112 +10,104 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/sort2" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" - cdssdk "gitlink.org.cn/cloudream/storage2/client/types" - stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - stgmod "gitlink.org.cn/cloudream/storage2/common/models" + "gitlink.org.cn/cloudream/storage2/client/internal/db" + "gitlink.org.cn/cloudream/storage2/client/internal/metacache" + "gitlink.org.cn/cloudream/storage2/client/internal/publics" + "gitlink.org.cn/cloudream/storage2/client/types" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/storage2/common/pkgs/ioswitch2/parser" - "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" - coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" ) type Uploader struct { distlock *distlock.Service connectivity *connectivity.Collector stgAgts *agtpool.AgentPool - stgMeta *metacache.UserSpaceMeta + spaceMeta *metacache.UserSpaceMeta db *db.DB } -func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { +func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, spaceMeta *metacache.UserSpaceMeta, db *db.DB) *Uploader { return &Uploader{ distlock: distlock, connectivity: connectivity, stgAgts: stgAgts, - stgMeta: stgMeta, + spaceMeta: spaceMeta, } } -func (u *Uploader) BeginUpdate(pkgID cdssdk.PackageID, affinity cdssdk.UserSpaceID, loadTo []cdssdk.UserSpaceID, loadToPath []string) (*UpdateUploader, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() +func (u *Uploader) BeginUpdate(pkgID types.PackageID, affinity types.UserSpaceID, loadTo []types.UserSpaceID, loadToPath []string) (*UpdateUploader, error) { + spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) + return nil, fmt.Errorf("getting user space ids: %w", err) } - defer stgglb.CoordinatorMQPool.Release(coorCli) - getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID)) - if err != nil { - return nil, fmt.Errorf("getting user storages: %w", err) - } + spaceDetails := u.spaceMeta.GetMany(spaceIDs) + spaceDetails = lo2.RemoveAllDefault(spaceDetails) cons := u.connectivity.GetAll() - var userStgs []UploadStorageInfo - for _, stg := range getUserStgsResp.Storages { - if stg.MasterHub == nil { + var uploadSpaces []UploadSpaceInfo + for _, space := range spaceDetails { + if space.MasterHub == nil { continue } - delay := time.Duration(math.MaxInt64) + latency := time.Duration(math.MaxInt64) - con, ok := cons[stg.MasterHub.HubID] + con, ok := cons[space.MasterHub.HubID] if ok && con.Latency != nil { - delay = *con.Latency + latency = *con.Latency } - userStgs = append(userStgs, UploadStorageInfo{ - Storage: stg, - Delay: delay, - IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, + uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ + Space: space, + Delay: latency, + IsSameLocation: space.MasterHub.LocationID == publics.Local.LocationID, }) } - if len(userStgs) == 0 { + if len(uploadSpaces) == 0 { return nil, fmt.Errorf("user no available storages") } - loadToStgs := make([]stgmod.StorageDetail, len(loadTo)) - for i, stgID := range loadTo { - stg, ok := lo.Find(getUserStgsResp.Storages, func(stg stgmod.StorageDetail) bool { - return stg.Storage.StorageID == stgID + loadToSpaces := make([]types.UserSpaceDetail, len(loadTo)) + for i, spaceID := range loadTo { + space, ok := lo.Find(spaceDetails, func(space *types.UserSpaceDetail) bool { + return space.UserSpace.UserSpaceID == spaceID }) if !ok { - return nil, fmt.Errorf("load to storage %v not found", stgID) - } - if stg.MasterHub == nil { - return nil, fmt.Errorf("load to storage %v has no master hub", stgID) + return nil, fmt.Errorf("load to storage %v not found", spaceID) } - if !factory.GetBuilder(stg).PublicStoreDesc().Enabled() { - return nil, fmt.Errorf("load to storage %v has no public store", stgID) + if space.MasterHub == nil { + return nil, fmt.Errorf("load to storage %v has no master hub", spaceID) } - loadToStgs[i] = stg + loadToSpaces[i] = *space } - target := u.chooseUploadStorage(userStgs, affinity) + target := u.chooseUploadStorage(uploadSpaces, affinity) + // TODO2 加锁 // 给上传节点的IPFS加锁 // TODO 考虑加Object的Create锁 // 防止上传的副本被清除 - distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Storage.Storage.StorageID).MutexLock(u.distlock) - if err != nil { - return nil, fmt.Errorf("acquire distlock: %w", err) - } + // distMutex, err := reqbuilder.NewBuilder().Shard().Buzy(target.Space.Storage.StorageID).MutexLock(u.distlock) + // if err != nil { + // return nil, fmt.Errorf("acquire distlock: %w", err) + // } return &UpdateUploader{ - uploader: u, - pkgID: pkgID, - targetStg: target.Storage, - distMutex: distMutex, - loadToStgs: loadToStgs, - loadToPath: loadToPath, + uploader: u, + pkgID: pkgID, + targetSpace: target.Space, + // distMutex: distMutex, + loadToSpaces: loadToSpaces, + loadToPath: loadToPath, }, nil } @@ -123,122 +115,109 @@ func (u *Uploader) BeginUpdate(pkgID cdssdk.PackageID, affinity cdssdk.UserSpace // 1. 选择设置了亲和性的节点 // 2. 从与当前客户端相同地域的节点中随机选一个 // 3. 没有的话从所有节点选择延迟最低的节点 -func (w *Uploader) chooseUploadStorage(storages []UploadStorageInfo, stgAffinity cdssdk.StorageID) UploadStorageInfo { - if stgAffinity > 0 { - aff, ok := lo.Find(storages, func(storage UploadStorageInfo) bool { return storage.Storage.Storage.StorageID == stgAffinity }) +func (w *Uploader) chooseUploadStorage(spaces []UploadSpaceInfo, spaceAffinity types.UserSpaceID) UploadSpaceInfo { + if spaceAffinity > 0 { + aff, ok := lo.Find(spaces, func(space UploadSpaceInfo) bool { return space.Space.UserSpace.UserSpaceID == spaceAffinity }) if ok { return aff } } - sameLocationStorages := lo.Filter(storages, func(e UploadStorageInfo, i int) bool { return e.IsSameLocation }) + sameLocationStorages := lo.Filter(spaces, func(e UploadSpaceInfo, i int) bool { return e.IsSameLocation }) if len(sameLocationStorages) > 0 { return sameLocationStorages[rand.Intn(len(sameLocationStorages))] } // 选择延迟最低的节点 - storages = sort2.Sort(storages, func(e1, e2 UploadStorageInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) + spaces = sort2.Sort(spaces, func(e1, e2 UploadSpaceInfo) int { return sort2.Cmp(e1.Delay, e2.Delay) }) - return storages[0] + return spaces[0] } -func (u *Uploader) BeginCreateLoad(userID cdssdk.UserID, bktID cdssdk.BucketID, pkgName string, loadTo []cdssdk.StorageID, loadToPath []string) (*CreateLoadUploader, error) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getStgs := u.stgMeta.GetMany(loadTo) +func (u *Uploader) BeginCreateLoad(bktID types.BucketID, pkgName string, loadTo []types.UserSpaceID, loadToPath []string) (*CreateLoadUploader, error) { + getSpaces := u.spaceMeta.GetMany(loadTo) - targetStgs := make([]stgmod.StorageDetail, len(loadTo)) - for i, stg := range getStgs { + spacesStgs := make([]types.UserSpaceDetail, len(loadTo)) + for i, stg := range getSpaces { if stg == nil { return nil, fmt.Errorf("storage %v not found", loadTo[i]) } - targetStgs[i] = *stg + spacesStgs[i] = *stg } - createPkg, err := coorCli.CreatePackage(coormq.NewCreatePackage(userID, bktID, pkgName)) + pkg, err := u.db.Package().Create(u.db.DefCtx(), bktID, pkgName) if err != nil { return nil, fmt.Errorf("create package: %w", err) } - reqBld := reqbuilder.NewBuilder() - for _, stg := range targetStgs { - reqBld.Shard().Buzy(stg.Storage.StorageID) - reqBld.Storage().Buzy(stg.Storage.StorageID) - } - lock, err := reqBld.MutexLock(u.distlock) - if err != nil { - return nil, fmt.Errorf("acquire distlock: %w", err) - } + // TODO2 加锁 + // reqBld := reqbuilder.NewBuilder() + // for _, stg := range spacesStgs { + // reqBld.Shard().Buzy(stg.Storage.StorageID) + // reqBld.Storage().Buzy(stg.Storage.StorageID) + // } + // lock, err := reqBld.MutexLock(u.distlock) + // if err != nil { + // return nil, fmt.Errorf("acquire distlock: %w", err) + // } return &CreateLoadUploader{ - pkg: createPkg.Package, - userID: userID, - targetSpaces: targetStgs, + pkg: pkg, + targetSpaces: spacesStgs, loadRoots: loadToPath, uploader: u, - distlock: lock, + // distlock: lock, }, nil } -func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index int, stream io.Reader) error { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() +func (u *Uploader) UploadPart(objID types.ObjectID, index int, stream io.Reader) error { + detail, err := u.db.Object().GetDetail(u.db.DefCtx(), objID) if err != nil { - return fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - details, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objID})) - if err != nil { - return err - } - - if details.Objects[0] == nil { - return fmt.Errorf("object %v not found", objID) + return fmt.Errorf("getting object detail: %w", err) } - objDe := details.Objects[0] - _, ok := objDe.Object.Redundancy.(*cdssdk.MultipartUploadRedundancy) + objDe := detail + _, ok := objDe.Object.Redundancy.(*types.MultipartUploadRedundancy) if !ok { return fmt.Errorf("object %v is not a multipart upload", objID) } - var stg stgmod.StorageDetail + var space types.UserSpaceDetail if len(objDe.Blocks) > 0 { - cstg := u.stgMeta.Get(objDe.Blocks[0].StorageID) + cstg := u.spaceMeta.Get(objDe.Blocks[0].UserSpaceID) if cstg == nil { - return fmt.Errorf("storage %v not found", objDe.Blocks[0].StorageID) + return fmt.Errorf("space %v not found", objDe.Blocks[0].UserSpaceID) } - stg = *cstg + space = *cstg } else { - getUserStgsResp, err := coorCli.GetUserStorageDetails(coormq.ReqGetUserStorageDetails(userID)) + spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) if err != nil { - return fmt.Errorf("getting user storages: %w", err) + return fmt.Errorf("getting user space ids: %w", err) } + spaces := u.spaceMeta.GetMany(spaceIDs) + spaces = lo2.RemoveAllDefault(spaces) + cons := u.connectivity.GetAll() - var userStgs []UploadStorageInfo - for _, stg := range getUserStgsResp.Storages { - if stg.MasterHub == nil { + var userStgs []UploadSpaceInfo + for _, space := range spaces { + if space.MasterHub == nil { continue } delay := time.Duration(math.MaxInt64) - con, ok := cons[stg.MasterHub.HubID] + con, ok := cons[space.MasterHub.HubID] if ok && con.Latency != nil { delay = *con.Latency } - userStgs = append(userStgs, UploadStorageInfo{ - Storage: stg, + userStgs = append(userStgs, UploadSpaceInfo{ + Space: *space, Delay: delay, - IsSameLocation: stg.MasterHub.LocationID == stgglb.Local.LocationID, + IsSameLocation: space.MasterHub.LocationID == publics.Local.LocationID, }) } @@ -246,19 +225,20 @@ func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index return fmt.Errorf("user no available storages") } - stg = u.chooseUploadStorage(userStgs, 0).Storage + space = u.chooseUploadStorage(userStgs, 0).Space } - lock, err := reqbuilder.NewBuilder().Shard().Buzy(stg.Storage.StorageID).MutexLock(u.distlock) - if err != nil { - return fmt.Errorf("acquire distlock: %w", err) - } - defer lock.Unlock() + // TODO2 加锁 + // lock, err := reqbuilder.NewBuilder().Shard().Buzy(space.Storage.StorageID).MutexLock(u.distlock) + // if err != nil { + // return fmt.Errorf("acquire distlock: %w", err) + // } + // defer lock.Unlock() ft := ioswitch2.NewFromTo() fromDrv, hd := ioswitch2.NewFromDriver(ioswitch2.RawStream()) ft.AddFrom(fromDrv). - AddTo(ioswitch2.NewToShardStore(*stg.MasterHub, stg, ioswitch2.RawStream(), "shard")) + AddTo(ioswitch2.NewToShardStore(*space.MasterHub, space, ioswitch2.RawStream(), "shard")) plans := exec.NewPlanBuilder() err = parser.Parse(ft, plans) @@ -276,12 +256,14 @@ func (u *Uploader) UploadPart(userID cdssdk.UserID, objID cdssdk.ObjectID, index } shardInfo := ret["shard"].(*ops2.ShardInfoValue) - _, err = coorCli.AddMultipartUploadPart(coormq.ReqAddMultipartUploadPart(userID, objID, stgmod.ObjectBlock{ - ObjectID: objID, - Index: index, - StorageID: stg.Storage.StorageID, - FileHash: shardInfo.Hash, - Size: shardInfo.Size, - })) + err = u.db.DoTx(func(tx db.SQLContext) error { + return u.db.Object().AppendPart(tx, types.ObjectBlock{ + ObjectID: objID, + Index: index, + UserSpaceID: space.UserSpace.UserSpaceID, + FileHash: shardInfo.Hash, + Size: shardInfo.Size, + }) + }) return err }