diff --git a/client/internal/db/db.go b/client/internal/db/db.go index 7db69a0..de6df2d 100644 --- a/client/internal/db/db.go +++ b/client/internal/db/db.go @@ -54,6 +54,12 @@ func DoTx11[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) ( return ret, err } +func DoTx20[T1 any, T2 any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) error, t1 T1, t2 T2) error { + return db.db.Transaction(func(tx *gorm.DB) error { + return do(SQLContext{tx}, t1, t2) + }) +} + func DoTx21[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) { var ret R err := db.db.Transaction(func(tx *gorm.DB) error { diff --git a/client/internal/db/package.go b/client/internal/db/package.go index 8e54b9b..8181e05 100644 --- a/client/internal/db/package.go +++ b/client/internal/db/package.go @@ -162,7 +162,7 @@ func (*PackageDB) GetByFullName(ctx SQLContext, bucketName string, packageName s return ret, err } -func (db *PackageDB) Create(ctx SQLContext, bucketID types.BucketID, name string) (types.Package, error) { +func (db *PackageDB) Create(ctx SQLContext, bucketID types.BucketID, name string, createTime time.Time) (types.Package, error) { var packageID int64 err := ctx.Table("Package"). Select("PackageID"). @@ -176,7 +176,7 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID types.BucketID, name string return types.Package{}, gorm.ErrDuplicatedKey } - newPackage := types.Package{Name: name, BucketID: bucketID, CreateTime: time.Now()} + newPackage := types.Package{Name: name, BucketID: bucketID, CreateTime: createTime} if err := ctx.Create(&newPackage).Error; err != nil { return types.Package{}, fmt.Errorf("insert package failed, err: %w", err) } @@ -301,7 +301,7 @@ func (db *PackageDB) TryCreateAll(ctx SQLContext, bktName string, pkgName string return types.Package{}, fmt.Errorf("get package by name: %w", err) } - pkg, err = db.Create(ctx, bkt.BucketID, pkgName) + pkg, err = db.Create(ctx, bkt.BucketID, pkgName, time.Now()) if err != nil { return types.Package{}, fmt.Errorf("create package: %w", err) } diff --git a/client/internal/http/user_space.go b/client/internal/http/user_space.go index a24d28a..8ec1a09 100644 --- a/client/internal/http/user_space.go +++ b/client/internal/http/user_space.go @@ -50,8 +50,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { return } - pkg, err := s.svc.UserSpaceSvc().UserSpaceCreatePackage( - req.BucketID, req.Name, req.UserSpaceID, req.Path, req.SpaceAffinity) + pkg, err := s.svc.Uploader.UserSpaceUpload(req.UserSpaceID, req.Path, req.BucketID, req.Name, req.SpaceAffinity) if err != nil { log.Warnf("userspace create package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("userspace create package: %v", err))) @@ -59,7 +58,7 @@ func (s *UserSpaceService) CreatePackage(ctx *gin.Context) { } ctx.JSON(http.StatusOK, OK(cliapi.UserSpaceCreatePackageResp{ - Package: pkg, + Package: *pkg, })) } diff --git a/client/internal/mount/vfs/fuse_bucket.go b/client/internal/mount/vfs/fuse_bucket.go index bf99714..f34052c 100644 --- a/client/internal/mount/vfs/fuse_bucket.go +++ b/client/internal/mount/vfs/fuse_bucket.go @@ -166,7 +166,7 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error return fmt.Errorf("get bucket: %v", err) } - _, err = db.Package().Create(tx, bkt.BucketID, name) + _, err = db.Package().Create(tx, bkt.BucketID, name, time.Now()) if err != nil { return fmt.Errorf("create package: %v", err) } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 04cbb48..3d6756f 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -2,6 +2,7 @@ package services import ( "fmt" + "time" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -34,7 +35,7 @@ func (svc *PackageService) GetBucketPackages(bucketID types.BucketID) ([]types.P } func (svc *PackageService) Create(bucketID types.BucketID, name string) (types.Package, error) { - pkg, err := svc.DB.Package().Create(svc.DB.DefCtx(), bucketID, name) + pkg, err := svc.DB.Package().Create(svc.DB.DefCtx(), bucketID, name, time.Now()) if err != nil { return types.Package{}, err } @@ -72,7 +73,7 @@ func (svc *PackageService) Clone(packageID types.PackageID, bucketID types.Bucke err := svc.DB.DoTx(func(tx db.SQLContext) error { var err error - pkg, err = svc.DB.Package().Create(tx, bucketID, name) + pkg, err = svc.DB.Package().Create(tx, bucketID, name, time.Now()) if err != nil { return fmt.Errorf("creating package: %w", err) } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 7ecb93d..cfa665a 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -115,42 +115,3 @@ func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspace return nil } - -// 请求节点启动从UserSpace中上传文件的任务。会返回节点ID和任务ID -func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID clitypes.BucketID, name string, userspaceID clitypes.UserSpaceID, path string, userspaceAffinity clitypes.UserSpaceID) (clitypes.Package, error) { - // coorCli, err := stgglb.CoordinatorMQPool.Acquire() - // if err != nil { - // return cdssdk.Package{}, fmt.Errorf("new coordinator client: %w", err) - // } - // defer stgglb.CoordinatorMQPool.Release(coorCli) - - // stgResp, err := coorCli.GetUserSpaceDetails(coormq.ReqGetUserSpaceDetails([]cdssdk.UserSpaceID{userspaceID})) - // if err != nil { - // return cdssdk.Package{}, fmt.Errorf("getting userspace info: %w", err) - // } - - // spaceDetail := svc.UserSpaceMeta.Get(userspaceID) - // if spaceDetail == nil { - // return cdssdk.Package{}, fmt.Errorf("userspace not found: %d", userspaceID) - // } - - // if spaceDetail.UserSpace.ShardStore == nil { - // return cdssdk.Package{}, fmt.Errorf("shard userspace is not enabled") - // } - - // hubCli, err := stgglb.HubMQPool.Acquire(spaceDetail.MasterHub.HubID) - // if err != nil { - // return cdssdk.Package{}, fmt.Errorf("new hub client: %w", err) - // } - // defer stgglb.HubMQPool.Release(hubCli) - - // createResp, err := hubCli.UserSpaceCreatePackage(hubmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) - // if err != nil { - // return cdssdk.Package{}, err - // } - - // return createResp.Package, nil - - // TODO 待实现 - return clitypes.Package{}, fmt.Errorf("not implemented") -} diff --git a/client/internal/uploader/uploader.go b/client/internal/uploader/uploader.go index 2a749cb..dc98b04 100644 --- a/client/internal/uploader/uploader.go +++ b/client/internal/uploader/uploader.go @@ -152,7 +152,7 @@ func (u *Uploader) BeginCreateLoad(bktID clitypes.BucketID, pkgName string, load return clitypes.Package{}, err } - return u.db.Package().Create(u.db.DefCtx(), bktID, pkgName) + return u.db.Package().Create(u.db.DefCtx(), bktID, pkgName, time.Now()) }) if err != nil { return nil, fmt.Errorf("create package: %w", err) diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go new file mode 100644 index 0000000..0f254e3 --- /dev/null +++ b/client/internal/uploader/user_space_upload.go @@ -0,0 +1,184 @@ +package uploader + +import ( + "context" + "fmt" + "math" + "strings" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" + cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" +) + +func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath string, targetBktID clitypes.BucketID, newPkgName string, uploadAffinity clitypes.UserSpaceID) (*clitypes.Package, error) { + srcSpace := u.spaceMeta.Get(userSpaceID) + if srcSpace == nil { + return nil, fmt.Errorf("user space %d not found", userSpaceID) + } + if srcSpace.MasterHub == nil { + return nil, fmt.Errorf("master hub not found for user space %d", userSpaceID) + } + + pkg, err := db.DoTx01(u.db, func(tx db.SQLContext) (clitypes.Package, error) { + _, err := u.db.Bucket().GetByID(tx, targetBktID) + if err != nil { + return clitypes.Package{}, err + } + + return u.db.Package().Create(tx, targetBktID, newPkgName, time.Now()) + }) + if err != nil { + return nil, fmt.Errorf("creating package: %w", err) + } + delPkg := func() { + u.db.Package().Delete(u.db.DefCtx(), pkg.PackageID) + } + + spaceIDs, err := u.db.UserSpace().GetAllIDs(u.db.DefCtx()) + if err != nil { + delPkg() + return nil, fmt.Errorf("getting user space ids: %w", err) + } + + spaceDetails := u.spaceMeta.GetMany(spaceIDs) + spaceDetails = lo.Filter(spaceDetails, func(e *clitypes.UserSpaceDetail, i int) bool { + return e != nil && e.MasterHub != nil && e.UserSpace.ShardStore != nil + }) + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + delPkg() + return nil, fmt.Errorf("acquiring coordinator mq client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + resp, err := coorCli.GetHubConnectivities(coordinator.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.MasterHub.HubID})) + if err != nil { + delPkg() + return nil, fmt.Errorf("getting hub connectivities: %w", err) + } + + cons := make(map[cortypes.HubID]cortypes.HubConnectivity) + for _, c := range resp.Connectivities { + cons[c.ToHubID] = c + } + + var uploadSpaces []UploadSpaceInfo + for _, space := range spaceDetails { + if space.MasterHub == nil { + continue + } + + latency := time.Duration(math.MaxInt64) + + con, ok := cons[space.MasterHub.HubID] + if ok && con.Latency != nil { + latency = time.Duration(*con.Latency * float32(time.Millisecond)) + } + + uploadSpaces = append(uploadSpaces, UploadSpaceInfo{ + Space: *space, + Delay: latency, + IsSameLocation: space.MasterHub.LocationID == srcSpace.MasterHub.LocationID, + }) + } + + if len(uploadSpaces) == 0 { + delPkg() + return nil, fmt.Errorf("user no available userspaces") + } + + targetSapce := u.chooseUploadStorage(uploadSpaces, uploadAffinity) + + srcHubCli, err := stgglb.HubMQPool.Acquire(srcSpace.MasterHub.HubID) + if err != nil { + delPkg() + return nil, fmt.Errorf("acquiring source hub mq client: %w", err) + } + defer stgglb.HubMQPool.Release(srcHubCli) + + listAllResp, err := srcHubCli.PublicStoreListAll(&hubmq.PublicStoreListAll{ + UserSpace: *srcSpace, + Path: rootPath, + }) + if err != nil { + delPkg() + return nil, fmt.Errorf("listing public store: %w", err) + } + + adds, err := u.uploadFromPublicStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath) + if err != nil { + delPkg() + return nil, fmt.Errorf("uploading from public store: %w", err) + } + + _, err = db.DoTx21(u.db, u.db.Object().BatchAdd, pkg.PackageID, adds) + if err != nil { + delPkg() + return nil, fmt.Errorf("adding objects: %w", err) + } + + return &pkg, nil +} + +func (u *Uploader) uploadFromPublicStore(srcSpace *clitypes.UserSpaceDetail, targetSpace *clitypes.UserSpaceDetail, entries []types.PublicStoreEntry, rootPath string) ([]db.AddObjectEntry, error) { + ft := ioswitch2.FromTo{} + + for _, e := range entries { + // 可以考虑增加一个配置项来控制是否上传空目录 + if e.IsDir { + continue + } + + ft.AddFrom(ioswitch2.NewFromPublicStore(*srcSpace.MasterHub, *srcSpace, e.Path)) + ft.AddTo(ioswitch2.NewToShardStore(*targetSpace.MasterHub, *targetSpace, ioswitch2.RawStream(), e.Path)) + } + + plans := exec.NewPlanBuilder() + err := parser.Parse(ft, plans) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + + exeCtx := exec.NewExecContext() + exec.SetValueByType(exeCtx, u.stgPool) + ret, err := plans.Execute(exeCtx).Wait(context.Background()) + if err != nil { + return nil, fmt.Errorf("executing plan: %w", err) + } + + cleanRoot := strings.TrimSuffix(rootPath, clitypes.ObjectPathSeparator) + + adds := make([]db.AddObjectEntry, 0, len(ret)) + for _, e := range entries { + if e.IsDir { + continue + } + pat := strings.TrimPrefix(e.Path, cleanRoot+clitypes.ObjectPathSeparator) + if pat == cleanRoot { + pat = clitypes.BaseName(e.Path) + } + + info := ret[e.Path].(*ops2.ShardInfoValue) + adds = append(adds, db.AddObjectEntry{ + Path: pat, + Size: info.Size, + FileHash: info.Hash, + CreateTime: time.Now(), + UserSpaceIDs: []clitypes.UserSpaceID{targetSpace.UserSpace.UserSpaceID}, + }) + } + + return adds, nil +} diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 9fba25e..ea523dd 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -3,7 +3,7 @@ package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/jcs-pub/client/types" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -69,9 +69,9 @@ type FromTos []FromTo type FromTo struct { // 如果输入或者输出用到了EC编码的流,则需要提供EC参数。 - ECParam *types.ECRedundancy + ECParam *clitypes.ECRedundancy // 同上 - SegmentParam *types.SegmentRedundancy + SegmentParam *clitypes.SegmentRedundancy Froms []From Toes []To } @@ -110,13 +110,13 @@ func (f *FromDriver) GetStreamIndex() StreamIndex { } type FromShardstore struct { - FileHash types.FileHash + FileHash clitypes.FileHash Hub cortypes.Hub - Space types.UserSpaceDetail + Space clitypes.UserSpaceDetail StreamIndex StreamIndex } -func NewFromShardstore(fileHash types.FileHash, hub cortypes.Hub, space types.UserSpaceDetail, strIdx StreamIndex) *FromShardstore { +func NewFromShardstore(fileHash clitypes.FileHash, hub cortypes.Hub, space clitypes.UserSpaceDetail, strIdx StreamIndex) *FromShardstore { return &FromShardstore{ FileHash: fileHash, Hub: hub, @@ -129,6 +129,26 @@ func (f *FromShardstore) GetStreamIndex() StreamIndex { return f.StreamIndex } +type FromPublicStore struct { + Hub cortypes.Hub + Space clitypes.UserSpaceDetail + Path string +} + +func NewFromPublicStore(hub cortypes.Hub, space clitypes.UserSpaceDetail, path string) *FromPublicStore { + return &FromPublicStore{ + Hub: hub, + Space: space, + Path: path, + } +} + +func (f *FromPublicStore) GetStreamIndex() StreamIndex { + return StreamIndex{ + Type: StreamIndexRaw, + } +} + type ToDriver struct { Handle *exec.DriverReadStream StreamIndex StreamIndex @@ -162,13 +182,13 @@ func (t *ToDriver) GetRange() math2.Range { type ToShardStore struct { Hub cortypes.Hub - Space types.UserSpaceDetail + Space clitypes.UserSpaceDetail StreamIndex StreamIndex Range math2.Range FileHashStoreKey string } -func NewToShardStore(hub cortypes.Hub, space types.UserSpaceDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore { +func NewToShardStore(hub cortypes.Hub, space clitypes.UserSpaceDetail, strIdx StreamIndex, fileHashStoreKey string) *ToShardStore { return &ToShardStore{ Hub: hub, Space: space, @@ -177,7 +197,7 @@ func NewToShardStore(hub cortypes.Hub, space types.UserSpaceDetail, strIdx Strea } } -func NewToShardStoreWithRange(hub cortypes.Hub, space types.UserSpaceDetail, streamIndex StreamIndex, fileHashStoreKey string, rng math2.Range) *ToShardStore { +func NewToShardStoreWithRange(hub cortypes.Hub, space clitypes.UserSpaceDetail, streamIndex StreamIndex, fileHashStoreKey string, rng math2.Range) *ToShardStore { return &ToShardStore{ Hub: hub, Space: space, @@ -197,11 +217,11 @@ func (t *ToShardStore) GetRange() math2.Range { type LoadToPublic struct { Hub cortypes.Hub - Space types.UserSpaceDetail + Space clitypes.UserSpaceDetail ObjectPath string } -func NewLoadToPublic(hub cortypes.Hub, space types.UserSpaceDetail, objectPath string) *LoadToPublic { +func NewLoadToPublic(hub cortypes.Hub, space clitypes.UserSpaceDetail, objectPath string) *LoadToPublic { return &LoadToPublic{ Hub: hub, Space: space, diff --git a/common/pkgs/ioswitch2/ops2/public_store.go b/common/pkgs/ioswitch2/ops2/public_store.go index 06227fd..235ed31 100644 --- a/common/pkgs/ioswitch2/ops2/public_store.go +++ b/common/pkgs/ioswitch2/ops2/public_store.go @@ -2,30 +2,78 @@ package ops2 import ( "fmt" + "io" + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/io2" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" ) func init() { - exec.UseOp[*PublicLoad]() + exec.UseOp[*PublicWrite]() + exec.UseOp[*PublicRead]() } -type PublicLoad struct { +type PublicRead struct { + Output exec.VarID + UserSpace clitypes.UserSpaceDetail + ObjectPath string +} + +func (o *PublicRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + logger. + WithField("Output", o.Output). + WithField("UserSpace", o.UserSpace). + WithField("ObjectPath", o.ObjectPath). + Debug("public read") + defer logger.Debug("public read end") + + stgPool, err := exec.GetValueByType[*pool.Pool](ctx) + if err != nil { + return fmt.Errorf("getting storage pool: %w", err) + } + + store, err := stgPool.GetPublicStore(&o.UserSpace) + if err != nil { + return fmt.Errorf("getting public store of storage %v: %w", o.UserSpace, err) + } + + stream, err := store.Read(o.ObjectPath) + if err != nil { + return fmt.Errorf("reading object %v: %w", o.ObjectPath, err) + } + + fut := future.NewSetVoid() + output := &exec.StreamValue{ + Stream: io2.AfterReadClosed(stream, func(closer io.ReadCloser) { + fut.SetVoid() + }), + } + + e.PutVar(o.Output, output) + return fut.Wait(ctx.Context) +} + +func (o *PublicRead) String() string { + return fmt.Sprintf("PublicRead %v:%v -> %v", o.UserSpace, o.ObjectPath, o.Output) +} + +type PublicWrite struct { Input exec.VarID UserSpace clitypes.UserSpaceDetail ObjectPath string } -func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { +func (o *PublicWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { logger. WithField("Input", o.Input). - Debugf("load file to public store") - defer logger.Debugf("load file to public store finished") + Debugf("write file to public store") + defer logger.Debugf("write file to public store finished") stgPool, err := exec.GetValueByType[*pool.Pool](ctx) if err != nil { @@ -46,19 +94,57 @@ func (o *PublicLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return store.Write(o.ObjectPath, input.Stream) } -func (o *PublicLoad) String() string { - return fmt.Sprintf("PublicLoad %v -> %v:%v", o.Input, o.UserSpace, o.ObjectPath) +func (o *PublicWrite) String() string { + return fmt.Sprintf("PublicWrite %v -> %v:%v", o.Input, o.UserSpace, o.ObjectPath) +} + +type PublicReadNode struct { + dag.NodeBase + From ioswitch2.From + UserSpace clitypes.UserSpaceDetail + ObjectPath string +} + +func (b *GraphNodeBuilder) NewPublicRead(from ioswitch2.From, userSpace clitypes.UserSpaceDetail, objPath string) *PublicReadNode { + node := &PublicReadNode{ + From: from, + UserSpace: userSpace, + ObjectPath: objPath, + } + b.AddNode(node) + + node.OutputStreams().Init(node, 1) + return node +} + +func (t *PublicReadNode) GetFrom() ioswitch2.From { + return t.From +} + +func (t *PublicReadNode) Output() dag.StreamOutputSlot { + return dag.StreamOutputSlot{ + Node: t, + Index: 0, + } +} + +func (t *PublicReadNode) GenerateOp() (exec.Op, error) { + return &PublicRead{ + Output: t.Output().Var().VarID, + UserSpace: t.UserSpace, + ObjectPath: t.ObjectPath, + }, nil } -type PublicLoadNode struct { +type PublicWriteNode struct { dag.NodeBase To ioswitch2.To UserSpace clitypes.UserSpaceDetail ObjectPath string } -func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, objPath string) *PublicLoadNode { - node := &PublicLoadNode{ +func (b *GraphNodeBuilder) NewPublicWrite(to ioswitch2.To, userSpace clitypes.UserSpaceDetail, objPath string) *PublicWriteNode { + node := &PublicWriteNode{ To: to, UserSpace: userSpace, ObjectPath: objPath, @@ -69,23 +155,23 @@ func (b *GraphNodeBuilder) NewPublicLoad(to ioswitch2.To, userSpace clitypes.Use return node } -func (t *PublicLoadNode) GetTo() ioswitch2.To { +func (t *PublicWriteNode) GetTo() ioswitch2.To { return t.To } -func (t *PublicLoadNode) SetInput(input *dag.StreamVar) { +func (t *PublicWriteNode) SetInput(input *dag.StreamVar) { input.To(t, 0) } -func (t *PublicLoadNode) Input() dag.StreamInputSlot { +func (t *PublicWriteNode) Input() dag.StreamInputSlot { return dag.StreamInputSlot{ Node: t, Index: 0, } } -func (t *PublicLoadNode) GenerateOp() (exec.Op, error) { - return &PublicLoad{ +func (t *PublicWriteNode) GenerateOp() (exec.Op, error) { + return &PublicWrite{ Input: t.InputStreams().Get(0).VarID, UserSpace: t.UserSpace, ObjectPath: t.ObjectPath, diff --git a/common/pkgs/ioswitch2/parser/gen/generator.go b/common/pkgs/ioswitch2/parser/gen/generator.go index 6b73dac..b791028 100644 --- a/common/pkgs/ioswitch2/parser/gen/generator.go +++ b/common/pkgs/ioswitch2/parser/gen/generator.go @@ -336,6 +336,24 @@ func buildFromNode(ctx *state.GenerateState, f ioswitch2.From) (ops2.FromNode, e return n, nil + case *ioswitch2.FromPublicStore: + // TODO 可以考虑支持设置读取范围 + n := ctx.DAG.NewPublicRead(f, f.Space, f.Path) + switch addr := f.Hub.Address.(type) { + case *cortypes.HttpAddressInfo: + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: f.Hub}) + n.Env().Pinned = true + + case *cortypes.GRPCAddressInfo: + n.Env().ToEnvWorker(&ioswitch2.HubWorker{Hub: f.Hub, Address: *addr}) + n.Env().Pinned = true + + default: + return nil, fmt.Errorf("unsupported node address type %T", addr) + } + + return n, nil + default: return nil, fmt.Errorf("unsupported from type %T", f) } @@ -362,7 +380,7 @@ func buildToNode(ctx *state.GenerateState, t ioswitch2.To) (ops2.ToNode, error) return n, nil case *ioswitch2.LoadToPublic: - n := ctx.DAG.NewPublicLoad(t, t.Space, t.ObjectPath) + n := ctx.DAG.NewPublicWrite(t, t.Space, t.ObjectPath) if err := setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { return nil, err diff --git a/common/pkgs/mq/hub/server.go b/common/pkgs/mq/hub/server.go index b9780c5..029a3a2 100644 --- a/common/pkgs/mq/hub/server.go +++ b/common/pkgs/mq/hub/server.go @@ -8,7 +8,7 @@ import ( ) type Service interface { - // UserSpaceService + UserSpaceService CacheService diff --git a/common/pkgs/mq/hub/storage.go b/common/pkgs/mq/hub/storage.go deleted file mode 100644 index 71aadcd..0000000 --- a/common/pkgs/mq/hub/storage.go +++ /dev/null @@ -1,48 +0,0 @@ -package hub - -/* -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -type UserSpaceService interface { - UserSpaceCreatePackage(msg *UserSpaceCreatePackage) (*UserSpaceCreatePackageResp, *mq.CodeMessage) -} - -// 启动从UserSpace上传Package的任务 -var _ = Register(Service.UserSpaceCreatePackage) - -type UserSpaceCreatePackage struct { - mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - BucketID cdssdk.BucketID `json:"bucketID"` - Name string `json:"name"` - UserSpaceID cdssdk.UserSpaceID `json:"userspaceID"` - Path string `json:"path"` - UserSpaceAffinity cdssdk.UserSpaceID `json:"userspaceAffinity"` -} -type UserSpaceCreatePackageResp struct { - mq.MessageBodyBase - Package cdssdk.Package `json:"package"` -} - -func ReqUserSpaceCreatePackage(userID cdssdk.UserID, bucketID cdssdk.BucketID, name string, userspaceID cdssdk.UserSpaceID, path string, stgAffinity cdssdk.UserSpaceID) *UserSpaceCreatePackage { - return &UserSpaceCreatePackage{ - UserID: userID, - BucketID: bucketID, - Name: name, - UserSpaceID: userspaceID, - Path: path, - UserSpaceAffinity: stgAffinity, - } -} -func RespUserSpaceCreatePackage(pkg cdssdk.Package) *UserSpaceCreatePackageResp { - return &UserSpaceCreatePackageResp{ - Package: pkg, - } -} -func (client *Client) UserSpaceCreatePackage(msg *UserSpaceCreatePackage, opts ...mq.RequestOption) (*UserSpaceCreatePackageResp, error) { - return mq.Request(Service.UserSpaceCreatePackage, client.rabbitCli, msg, opts...) -} -*/ diff --git a/common/pkgs/mq/hub/user_space.go b/common/pkgs/mq/hub/user_space.go new file mode 100644 index 0000000..6ffbaeb --- /dev/null +++ b/common/pkgs/mq/hub/user_space.go @@ -0,0 +1,28 @@ +package hub + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + stgtypes "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" +) + +type UserSpaceService interface { + PublicStoreListAll(msg *PublicStoreListAll) (*PublicStoreListAllResp, *mq.CodeMessage) +} + +// 启动从UserSpace上传Package的任务 +var _ = Register(Service.PublicStoreListAll) + +type PublicStoreListAll struct { + mq.MessageBodyBase + UserSpace clitypes.UserSpaceDetail + Path string +} +type PublicStoreListAllResp struct { + mq.MessageBodyBase + Entries []stgtypes.PublicStoreEntry +} + +func (client *Client) PublicStoreListAll(msg *PublicStoreListAll, opts ...mq.RequestOption) (*PublicStoreListAllResp, error) { + return mq.Request(Service.PublicStoreListAll, client.rabbitCli, msg, opts...) +} diff --git a/common/pkgs/storage/local/local.go b/common/pkgs/storage/local/local.go index 8cda82b..42baf46 100644 --- a/common/pkgs/storage/local/local.go +++ b/common/pkgs/storage/local/local.go @@ -31,11 +31,21 @@ func (b *builder) FeatureDesc() types.FeatureDesc { } func (b *builder) CreateShardStore() (types.ShardStore, error) { - return NewShardStore(b.detail) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.LocalCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for local storage", b.detail.UserSpace.Credential) + } + + return NewShardStore(cred.RootDir, b.detail) } func (b *builder) CreatePublicStore() (types.PublicStore, error) { - return NewPublicStore(b.detail) + cred, ok := b.detail.UserSpace.Credential.(*cortypes.LocalCred) + if !ok { + return nil, fmt.Errorf("invalid storage credential type %T for local storage", b.detail.UserSpace.Credential) + } + + return NewPublicStore(cred.RootDir, b.detail) } func (b *builder) CreateMultiparter() (types.Multiparter, error) { diff --git a/common/pkgs/storage/local/public_store.go b/common/pkgs/storage/local/public_store.go index 443d3b4..9dd007f 100644 --- a/common/pkgs/storage/local/public_store.go +++ b/common/pkgs/storage/local/public_store.go @@ -8,25 +8,30 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) type PublicStore struct { + root string detail *clitypes.UserSpaceDetail } -func NewPublicStore(detail *clitypes.UserSpaceDetail) (*PublicStore, error) { +func NewPublicStore(root string, detail *clitypes.UserSpaceDetail) (*PublicStore, error) { return &PublicStore{ + root: root, detail: detail, }, nil } func (s *PublicStore) Write(objPath string, stream io.Reader) error { - err := os.MkdirAll(filepath.Dir(objPath), 0755) + absObjPath := filepath.Join(s.root, objPath) + + err := os.MkdirAll(filepath.Dir(absObjPath), 0755) if err != nil { return err } - f, err := os.Create(objPath) + f, err := os.Create(absObjPath) if err != nil { return err } @@ -41,7 +46,8 @@ func (s *PublicStore) Write(objPath string, stream io.Reader) error { } func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { - f, err := os.Open(objPath) + absObjPath := filepath.Join(s.root, objPath) + f, err := os.Open(absObjPath) if err != nil { return nil, err } @@ -49,40 +55,48 @@ func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { return f, nil } -func (s *PublicStore) List(path string, recursive bool) ([]string, error) { - var pathes []string - if recursive { - err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error { - if err != nil { - return err - } - if d.IsDir() { - return nil - } - - pathes = append(pathes, filepath.ToSlash(path)) - return nil - }) +func (s *PublicStore) ListAll(path string) ([]types.PublicStoreEntry, error) { + absObjPath := filepath.Join(s.root, path) + + var es []types.PublicStoreEntry + err := filepath.WalkDir(absObjPath, func(path string, d fs.DirEntry, err error) error { if err != nil { - return nil, err + return err } - } else { - files, err := os.ReadDir(path) + relaPath, err := filepath.Rel(s.root, path) if err != nil { - return nil, err + return err } - for _, f := range files { - if f.IsDir() { - continue - } - - pathes = append(pathes, filepath.ToSlash(filepath.Join(path, f.Name()))) + if d.IsDir() { + es = append(es, types.PublicStoreEntry{ + Path: filepath.ToSlash(relaPath), + Size: 0, + IsDir: true, + }) + return nil } + info, err := d.Info() + if err != nil { + return err + } + + es = append(es, types.PublicStoreEntry{ + Path: filepath.ToSlash(relaPath), + Size: info.Size(), + IsDir: false, + }) + return nil + }) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + return nil, err } - return pathes, nil + return es, nil } func (s *PublicStore) getLogger() logger.Logger { diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 2472462..1642d94 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -31,8 +31,8 @@ type ShardStore struct { done chan any } -func NewShardStore(detail *clitypes.UserSpaceDetail) (*ShardStore, error) { - absRoot, err := filepath.Abs(detail.UserSpace.ShardStore.Root) +func NewShardStore(root string, detail *clitypes.UserSpaceDetail) (*ShardStore, error) { + absRoot, err := filepath.Abs(filepath.Join(root, detail.UserSpace.ShardStore.BaseDir)) if err != nil { return nil, fmt.Errorf("get abs root: %w", err) } diff --git a/common/pkgs/storage/s3/public_store.go b/common/pkgs/storage/s3/public_store.go index baa41fc..d417aa9 100644 --- a/common/pkgs/storage/s3/public_store.go +++ b/common/pkgs/storage/s3/public_store.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" "gitlink.org.cn/cloudream/common/pkgs/logger" clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" ) type PublicStore struct { @@ -51,20 +52,17 @@ func (s *PublicStore) Read(objPath string) (io.ReadCloser, error) { return resp.Body, nil } -func (s *PublicStore) List(path string, recursive bool) ([]string, error) { +func (s *PublicStore) ListAll(path string) ([]types.PublicStoreEntry, error) { key := path // TODO 待测试 input := &s3.ListObjectsInput{ - Bucket: aws.String(s.Bucket), - Prefix: aws.String(key), - } - - if !recursive { - input.Delimiter = aws.String("/") + Bucket: aws.String(s.Bucket), + Prefix: aws.String(key), + Delimiter: aws.String("/"), } - var pathes []string + var objs []types.PublicStoreEntry var marker *string for { @@ -75,7 +73,11 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { } for _, obj := range resp.Contents { - pathes = append(pathes, *obj.Key) + objs = append(objs, types.PublicStoreEntry{ + Path: *obj.Key, + Size: *obj.Size, + IsDir: false, + }) } if !*resp.IsTruncated { @@ -85,7 +87,7 @@ func (s *PublicStore) List(path string, recursive bool) ([]string, error) { marker = resp.NextMarker } - return pathes, nil + return objs, nil } func (s *PublicStore) getLogger() logger.Logger { diff --git a/common/pkgs/storage/s3/shard_store.go b/common/pkgs/storage/s3/shard_store.go index 53d7873..655a544 100644 --- a/common/pkgs/storage/s3/shard_store.go +++ b/common/pkgs/storage/s3/shard_store.go @@ -52,7 +52,7 @@ func NewShardStore(detail *clitypes.UserSpaceDetail, cli *s3.Client, bkt string, } func (s *ShardStore) Start(ch *types.StorageEventChan) { - s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.Root) + s.getLogger().Infof("start, root: %v", s.Detail.UserSpace.ShardStore.BaseDir) go func() { removeTempTicker := time.NewTicker(time.Minute * 10) @@ -81,7 +81,7 @@ func (s *ShardStore) removeUnusedTempFiles() { for { resp, err := s.cli.ListObjects(context.Background(), &s3.ListObjectsInput{ Bucket: aws.String(s.Bucket), - Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.Root, TempDir, "/")), + Prefix: aws.String(JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, TempDir, "/")), Marker: marker, }) @@ -223,7 +223,7 @@ func (s *ShardStore) createTempFile() (string, string) { s.lock.Lock() defer s.lock.Unlock() - tmpDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, TempDir) + tmpDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, TempDir) tmpName := os2.GenerateRandomFileName(20) s.workingTempFiles[tmpName] = true @@ -335,7 +335,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo - blockDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir) + blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir) var marker *string for { @@ -384,7 +384,7 @@ func (s *ShardStore) GC(avaiables []clitypes.FileHash) error { avais[hash] = true } - blockDir := JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir) + blockDir := JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir) var deletes []s3types.ObjectIdentifier var marker *string @@ -454,11 +454,11 @@ func (s *ShardStore) getLogger() logger.Logger { } func (s *ShardStore) GetFileDirFromHash(hash clitypes.FileHash) string { - return JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir, hash.GetHashPrefix(2)) + return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2)) } func (s *ShardStore) GetFilePathFromHash(hash clitypes.FileHash) string { - return JoinKey(s.Detail.UserSpace.ShardStore.Root, BlocksDir, hash.GetHashPrefix(2), string(hash)) + return JoinKey(s.Detail.UserSpace.ShardStore.BaseDir, BlocksDir, hash.GetHashPrefix(2), string(hash)) } var _ types.BypassWrite = (*ShardStore)(nil) diff --git a/common/pkgs/storage/types/public_store.go b/common/pkgs/storage/types/public_store.go index d4106be..ae046eb 100644 --- a/common/pkgs/storage/types/public_store.go +++ b/common/pkgs/storage/types/public_store.go @@ -4,9 +4,17 @@ import ( "io" ) +type PublicStoreEntry struct { + Path string + Size int64 + IsDir bool +} + type PublicStore interface { - Write(objectPath string, stream io.Reader) error - Read(objectPath string) (io.ReadCloser, error) - // 返回指定路径下的所有文件 - List(path string, recursive bool) ([]string, error) + Write(path string, stream io.Reader) error + Read(path string) (io.ReadCloser, error) + // 返回指定路径下的所有文件,文件路径是包含path在内的完整路径。返回结果的第一条一定是路径本身,可能是文件,也可能是目录。 + // 如果路径不存在,那么不会返回错误,而是返回一个空列表。 + // 返回的内容严格按照存储系统的原始结果来,比如当存储系统是一个对象存储时,那么就可能不会包含目录,或者包含用于模拟的以“/”结尾的对象。 + ListAll(path string) ([]PublicStoreEntry, error) } diff --git a/coordinator/types/storage.go b/coordinator/types/storage.go index 848b4c3..5a4da27 100644 --- a/coordinator/types/storage.go +++ b/coordinator/types/storage.go @@ -144,6 +144,6 @@ func (a *S3Type) String() string { } type ShardStoreUserConfig struct { - Root string `json:"root"` + BaseDir string `json:"baseDir"` MaxSize int64 `json:"maxSize"` } diff --git a/coordinator/types/storage_credential.go b/coordinator/types/storage_credential.go index 3a838e7..8af6e41 100644 --- a/coordinator/types/storage_credential.go +++ b/coordinator/types/storage_credential.go @@ -23,6 +23,7 @@ type LocalCred struct { StorageCredential `json:"-"` serder.Metadata `union:"Local"` Type string `json:"type"` + RootDir string `json:"rootDir"` } type MashupCred struct { diff --git a/hub/internal/mq/storage.go b/hub/internal/mq/storage.go deleted file mode 100644 index 9fc80b0..0000000 --- a/hub/internal/mq/storage.go +++ /dev/null @@ -1,59 +0,0 @@ -package mq - -/* -import ( - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" -) - -func (svc *Service) StorageCreatePackage(msg *hubmq.StorageCreatePackage) (*hubmq.StorageCreatePackageResp, *mq.CodeMessage) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - pub, err := svc.stgHubs.GetPublicStore(msg.StorageID) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - createResp, err := coorCli.CreatePackage(coormq.NewCreatePackage(msg.UserID, msg.BucketID, msg.Name)) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - uploader, err := svc.uploader.BeginUpdate(msg.UserID, createResp.Package.PackageID, msg.StorageAffinity, nil, nil) - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - objPathes, err := pub.List(msg.Path, true) - for _, p := range objPathes { - o, err := pub.Read(p) - if err != nil { - logger.Warnf("read object %s: %v", p, err) - continue - } - - err = uploader.Upload(p, o) - o.Close() - if err != nil { - logger.Warnf("upload object %s: %v", p, err) - continue - } - } - _, err = uploader.Commit() - if err != nil { - return nil, mq.Failed(errorcode.OperationFailed, err.Error()) - } - - return mq.ReplyOK(hubmq.RespStorageCreatePackage(createResp.Package)) -} -*/ diff --git a/hub/internal/mq/user_space.go b/hub/internal/mq/user_space.go new file mode 100644 index 0000000..d17c0ad --- /dev/null +++ b/hub/internal/mq/user_space.go @@ -0,0 +1,23 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/mq" + hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" +) + +func (svc *Service) PublicStoreListAll(msg *hubmq.PublicStoreListAll) (*hubmq.PublicStoreListAllResp, *mq.CodeMessage) { + pub, err := svc.stgPool.GetPublicStore(&msg.UserSpace) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + es, err := pub.ListAll(msg.Path) + if err != nil { + return nil, mq.Failed(errorcode.OperationFailed, err.Error()) + } + + return mq.ReplyOK(&hubmq.PublicStoreListAllResp{ + Entries: es, + }) +}