diff --git a/client/internal/cmdline/put.go b/client/internal/cmdline/put.go index 873d002..1e0cdca 100644 --- a/client/internal/cmdline/put.go +++ b/client/internal/cmdline/put.go @@ -18,7 +18,7 @@ import ( func init() { var nodeID int64 cmd := &cobra.Command{ - Use: "put", + Use: "put [local] [remote]", Short: "Upload files to CDS", Args: func(cmd *cobra.Command, args []string) error { if err := cobra.ExactArgs(2)(cmd, args); err != nil { diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index 4938c29..a5c31ff 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -100,7 +100,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult } if len(userStgs) == 0 { - return nil, fmt.Errorf("user no available nodes") + return nil, fmt.Errorf("user no available storages") } // 给上传节点的IPFS加锁 diff --git a/common/pkgs/db2/bucket.go b/common/pkgs/db2/bucket.go index 7767394..cd5f691 100644 --- a/common/pkgs/db2/bucket.go +++ b/common/pkgs/db2/bucket.go @@ -91,22 +91,22 @@ func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName stri Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName). Scan(&bucketID).Error - if err == nil { - return 0, fmt.Errorf("bucket name exists") + if err != nil { + return 0, err } - if !errors.Is(err, gorm.ErrRecordNotFound) { - return 0, err + if bucketID > 0 { + return 0, fmt.Errorf("bucket name exists") } newBucket := cdssdk.Bucket{Name: bucketName, CreatorID: userID} - if err := ctx.Create(&newBucket).Error; err != nil { + if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil { return 0, fmt.Errorf("insert bucket failed, err: %w", err) } - err = ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID).Error - if err := ctx.Create(&newBucket).Error; err != nil { - return 0, fmt.Errorf("insert bucket failed, err: %w", err) + err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error + if err != nil { + return 0, fmt.Errorf("insert user bucket: %w", err) } return newBucket.BucketID, nil diff --git a/common/pkgs/db2/model/model.go b/common/pkgs/db2/model/model.go index c9fe64a..3fcf6bf 100644 --- a/common/pkgs/db2/model/model.go +++ b/common/pkgs/db2/model/model.go @@ -1,12 +1,9 @@ package model import ( - "fmt" - "reflect" "time" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/serder" stgmod "gitlink.org.cn/cloudream/storage/common/models" ) @@ -57,38 +54,6 @@ type Object = cdssdk.Object type NodeConnectivity = cdssdk.NodeConnectivity -// 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject, -// 再.ToObject()转成Object -type TempObject struct { - cdssdk.Object - Redundancy RedundancyWarpper `db:"Redundancy"` -} - -func (o *TempObject) ToObject() cdssdk.Object { - obj := o.Object - obj.Redundancy = o.Redundancy.Value - return obj -} - -type RedundancyWarpper struct { - Value cdssdk.Redundancy -} - -func (o *RedundancyWarpper) Scan(src interface{}) error { - data, ok := src.([]uint8) - if !ok { - return fmt.Errorf("unknow src type: %v", reflect.TypeOf(data)) - } - - red, err := serder.JSONToObjectEx[cdssdk.Redundancy](data) - if err != nil { - return err - } - - o.Value = red - return nil -} - type ObjectBlock = stgmod.ObjectBlock type Cache struct { diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 9dc20f8..899c491 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -8,7 +8,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/sort2" "gorm.io/gorm/clause" - "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" @@ -24,9 +23,9 @@ func (db *DB) Object() *ObjectDB { } func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) { - var ret model.TempObject + var ret cdssdk.Object err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error - return ret.ToObject(), err + return ret, err } func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) { @@ -53,13 +52,13 @@ func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]mod return nil, nil } - var objs []model.TempObject + var objs []cdssdk.Object err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error if err != nil { return nil, err } - return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil + return objs, nil } func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) { @@ -67,13 +66,13 @@ func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID return nil, nil } - var objs []model.TempObject + var objs []cdssdk.Object err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error if err != nil { return nil, err } - return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil + return objs, nil } func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) { @@ -233,13 +232,13 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] // 创建 Cache caches := make([]model.Cache, len(adds)) - for _, add := range adds { - caches = append(caches, model.Cache{ + for i, add := range adds { + caches[i] = model.Cache{ FileHash: add.FileHash, StorageID: add.StorageID, CreateTime: time.Now(), Priority: 0, - }) + } } if err := ctx.Table("Cache").Create(&caches).Error; err != nil { return nil, fmt.Errorf("batch create caches: %w", err) @@ -335,9 +334,9 @@ func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { return nil } - return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&model.TempObject{}).Error + return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error } func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error { - return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&model.TempObject{}).Error + return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error } diff --git a/common/pkgs/db2/package.go b/common/pkgs/db2/package.go index e181a4a..ee6dec5 100644 --- a/common/pkgs/db2/package.go +++ b/common/pkgs/db2/package.go @@ -116,21 +116,21 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin err := ctx.Table("Package"). Select("PackageID"). Where("Name = ? AND BucketID = ?", name, bucketID). - First(&packageID).Error + Scan(&packageID).Error - if err == nil { - return 0, fmt.Errorf("package with given Name and BucketID already exists") + if err != nil { + return 0, err } - if !errors.Is(err, gorm.ErrRecordNotFound) { - return 0, fmt.Errorf("query Package by PackageName and BucketID failed, err: %w", err) + if packageID != 0 { + return 0, errors.New("package already exists") } - newPackage := model.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal} + newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal} if err := ctx.Create(&newPackage).Error; err != nil { return 0, fmt.Errorf("insert package failed, err: %w", err) } - return cdssdk.PackageID(newPackage.PackageID), nil + return newPackage.PackageID, nil } // SoftDelete 设置一个对象被删除,并将相关数据删除 diff --git a/common/pkgs/db2/package_access_stat.go b/common/pkgs/db2/package_access_stat.go index ec0d3db..f5949b9 100644 --- a/common/pkgs/db2/package_access_stat.go +++ b/common/pkgs/db2/package_access_stat.go @@ -4,6 +4,8 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gorm.io/gorm" + "gorm.io/gorm/clause" ) type PackageAccessStatDB struct { @@ -41,10 +43,12 @@ func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.Add return nil } - sql := "INSERT INTO PackageAccessStat(PackageID, StorageID, Counter, Amount) " + - "VALUES(:PackageID, :StorageID, :Counter, 0) ON DUPLICATE KEY UPDATE Counter = Counter + VALUES(Counter)" - - return ctx.Exec(sql, entries).Error + return ctx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "PackageID"}, {Name: "StorageID"}}, + DoUpdates: clause.Assignments(map[string]any{ + "Counter": gorm.Expr("Counter + values(Counter)"), + }), + }).Table("PackageAccessStat").Create(&entries).Error } func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { diff --git a/common/pkgs/db2/storage.go b/common/pkgs/db2/storage.go index 1085b66..2ede6b8 100644 --- a/common/pkgs/db2/storage.go +++ b/common/pkgs/db2/storage.go @@ -43,7 +43,7 @@ func (db *StorageDB) GetUserStorages(ctx SQLContext, userID cdssdk.UserID) ([]mo func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) { var ret []cdssdk.StorageID - err := ctx.Table("Storage").Select("StorageID").Find(ret).Limit(count).Offset(start).Error + err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error return ret, err } @@ -79,6 +79,6 @@ func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.NodeID) ([]model.Storage, error) { var stgs []model.Storage - err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "NodeID = ?", hubID).Error + err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error return stgs, err } diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index dfceafa..3c75f6f 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -116,7 +116,7 @@ func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) { t.InputStreams().EnsureSize(1) - input.Connect(t, 0) + input.StreamTo(t, 0) t.OutputStreams().Resize(cnt) for i := 0; i < cnt; i++ { t.OutputStreams().Setup(t, t.Graph().NewVar(), i) @@ -136,11 +136,11 @@ func (t *ChunkedSplitNode) Clear() { return } - t.InputStreams().Get(0).Disconnect(t, 0) + t.InputStreams().Get(0).StreamNotTo(t, 0) t.InputStreams().Resize(0) for _, out := range t.OutputStreams().RawArray() { - out.DisconnectAll() + out.NoInputAllStream() } t.OutputStreams().Resize(0) } @@ -176,7 +176,7 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { func (t *ChunkedJoinNode) AddInput(str *dag.Var) { idx := t.InputStreams().EnlargeOne() - str.Connect(t, idx) + str.StreamTo(t, idx) } func (t *ChunkedJoinNode) Joined() *dag.Var { @@ -185,7 +185,7 @@ func (t *ChunkedJoinNode) Joined() *dag.Var { func (t *ChunkedJoinNode) RemoveAllInputs() { for i, in := range t.InputStreams().RawArray() { - in.Disconnect(t, i) + in.StreamNotTo(t, i) } t.InputStreams().Resize(0) } diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index aed4094..7ede930 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -83,7 +83,7 @@ func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { func (t *CloneStreamType) SetInput(raw *dag.Var) { t.InputStreams().EnsureSize(1) - raw.Connect(t, 0) + raw.StreamTo(t, 0) } func (t *CloneStreamType) NewOutput() *dag.Var { @@ -117,7 +117,7 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { func (t *CloneVarType) SetInput(raw *dag.Var) { t.InputValues().EnsureSize(1) - raw.Connect(t, 0) + raw.ValueTo(t, 0) } func (t *CloneVarType) NewOutput() *dag.Var { diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 616e370..9ffa468 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -222,12 +222,12 @@ func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode func (t *ECMultiplyNode) AddInput(str *dag.Var, dataIndex int) { t.InputIndexes = append(t.InputIndexes, dataIndex) idx := t.InputStreams().EnlargeOne() - str.Connect(t, idx) + str.StreamTo(t, idx) } func (t *ECMultiplyNode) RemoveAllInputs() { for i, in := range t.InputStreams().RawArray() { - in.Disconnect(t, i) + in.StreamNotTo(t, i) } t.InputStreams().Resize(0) t.InputIndexes = nil diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index 61a3694..be3920a 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -133,7 +133,7 @@ func (t *FileWriteNode) Input() dag.Slot { func (t *FileWriteNode) SetInput(str *dag.Var) { t.InputStreams().EnsureSize(1) - str.Connect(t, 0) + str.StreamTo(t, 0) } func (t *FileWriteNode) GenerateOp() (exec.Op, error) { diff --git a/common/pkgs/ioswitch2/ops2/multipart.go b/common/pkgs/ioswitch2/ops2/multipart.go index 969cab5..c98d2cb 100644 --- a/common/pkgs/ioswitch2/ops2/multipart.go +++ b/common/pkgs/ioswitch2/ops2/multipart.go @@ -1,5 +1,6 @@ package ops2 +/* import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" @@ -186,3 +187,4 @@ func (t MultipartUploadNode) GenerateOp() (exec.Op, error) { PartSize: t.PartSize, }, nil } +*/ diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index a8379a3..b2bf66f 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -88,7 +88,7 @@ func (b *GraphNodeBuilder) NewRange() *RangeNode { func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var { t.InputStreams().EnsureSize(1) - input.Connect(t, 0) + input.StreamTo(t, 0) t.Range = rng output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 6e98c98..ba6fcff 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -164,6 +164,7 @@ type ShardWriteNode struct { func (b *GraphNodeBuilder) NewShardWrite(stgID cdssdk.StorageID, fileHashStoreKey string) *ShardWriteNode { node := &ShardWriteNode{ + StorageID: stgID, FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) @@ -172,7 +173,7 @@ func (b *GraphNodeBuilder) NewShardWrite(stgID cdssdk.StorageID, fileHashStoreKe func (t *ShardWriteNode) SetInput(input *dag.Var) { t.InputStreams().EnsureSize(1) - input.Connect(t, 0) + input.StreamTo(t, 0) t.OutputValues().SetupNew(t, t.Graph().NewVar()) } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 8310d83..25bb75b 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -282,7 +282,18 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN switch t := t.(type) { case *ioswitch2.ToShardStore: n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey) - n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Hub}) + + switch addr := t.Hub.Address.(type) { + case *cdssdk.HttpAddressInfo: + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Node: t.Hub}) + + case *cdssdk.GRPCAddressInfo: + n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Hub, Address: *addr}) + + default: + return nil, fmt.Errorf("unsupported node address type %T", addr) + } + n.Env().Pinned = true return n, nil @@ -404,9 +415,9 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { // F->Split->Join->T 变换为:F->T splitInput := splitNode.InputStreams().Get(0) for _, to := range joinNode.Joined().To().RawArray() { - splitInput.Connect(to.Node, to.SlotIndex) + splitInput.StreamTo(to.Node, to.SlotIndex) } - splitInput.Disconnect(splitNode, 0) + splitInput.StreamNotTo(splitNode, 0) // 并删除这两个指令 ctx.DAG.RemoveNode(joinNode) @@ -528,7 +539,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }) - toInput.Var.Disconnect(toNode, toInput.Index) + toInput.Var.StreamNotTo(toNode, toInput.Index) toNode.SetInput(rnged) } else { @@ -544,7 +555,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { Offset: toRng.Offset - blkStart, Length: toRng.Length, }) - toInput.Var.Disconnect(toNode, toInput.Index) + toInput.Var.StreamNotTo(toNode, toInput.Index) toNode.SetInput(rnged) } } @@ -561,7 +572,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { c := ctx.DAG.NewCloneStream() *c.Env() = *node.Env() for _, to := range out.To().RawArray() { - c.NewOutput().Connect(to.Node, to.SlotIndex) + c.NewOutput().StreamTo(to.Node, to.SlotIndex) } out.To().Resize(0) c.SetInput(out) @@ -575,7 +586,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { t := ctx.DAG.NewCloneValue() *t.Env() = *node.Env() for _, to := range out.To().RawArray() { - t.NewOutput().Connect(to.Node, to.SlotIndex) + t.NewOutput().ValueTo(to.Node, to.SlotIndex) } out.To().Resize(0) t.SetInput(out) diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index dfceafa..3c75f6f 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -116,7 +116,7 @@ func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) { t.InputStreams().EnsureSize(1) - input.Connect(t, 0) + input.StreamTo(t, 0) t.OutputStreams().Resize(cnt) for i := 0; i < cnt; i++ { t.OutputStreams().Setup(t, t.Graph().NewVar(), i) @@ -136,11 +136,11 @@ func (t *ChunkedSplitNode) Clear() { return } - t.InputStreams().Get(0).Disconnect(t, 0) + t.InputStreams().Get(0).StreamNotTo(t, 0) t.InputStreams().Resize(0) for _, out := range t.OutputStreams().RawArray() { - out.DisconnectAll() + out.NoInputAllStream() } t.OutputStreams().Resize(0) } @@ -176,7 +176,7 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { func (t *ChunkedJoinNode) AddInput(str *dag.Var) { idx := t.InputStreams().EnlargeOne() - str.Connect(t, idx) + str.StreamTo(t, idx) } func (t *ChunkedJoinNode) Joined() *dag.Var { @@ -185,7 +185,7 @@ func (t *ChunkedJoinNode) Joined() *dag.Var { func (t *ChunkedJoinNode) RemoveAllInputs() { for i, in := range t.InputStreams().RawArray() { - in.Disconnect(t, i) + in.StreamNotTo(t, i) } t.InputStreams().Resize(0) } diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index aed4094..7ede930 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -83,7 +83,7 @@ func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { func (t *CloneStreamType) SetInput(raw *dag.Var) { t.InputStreams().EnsureSize(1) - raw.Connect(t, 0) + raw.StreamTo(t, 0) } func (t *CloneStreamType) NewOutput() *dag.Var { @@ -117,7 +117,7 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { func (t *CloneVarType) SetInput(raw *dag.Var) { t.InputValues().EnsureSize(1) - raw.Connect(t, 0) + raw.ValueTo(t, 0) } func (t *CloneVarType) NewOutput() *dag.Var { diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 1d6d5e3..0bced37 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -131,12 +131,12 @@ func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCCons func (t *LRCConstructAnyNode) AddInput(str *dag.Var, dataIndex int) { t.InputIndexes = append(t.InputIndexes, dataIndex) idx := t.InputStreams().EnlargeOne() - str.Connect(t, idx) + str.StreamTo(t, idx) } func (t *LRCConstructAnyNode) RemoveAllInputs() { for i, in := range t.InputStreams().RawArray() { - in.Disconnect(t, i) + in.StreamNotTo(t, i) } t.InputStreams().Resize(0) t.InputIndexes = nil @@ -191,7 +191,7 @@ func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.Var) t.InputStreams().Resize(0) for _, in := range inputs { idx := t.InputStreams().EnlargeOne() - in.Connect(t, idx) + in.StreamTo(t, idx) } output := t.Graph().NewVar() diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index a8379a3..b2bf66f 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -88,7 +88,7 @@ func (b *GraphNodeBuilder) NewRange() *RangeNode { func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var { t.InputStreams().EnsureSize(1) - input.Connect(t, 0) + input.StreamTo(t, 0) t.Range = rng output := t.Graph().NewVar() t.OutputStreams().Setup(t, output, 0) diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index daae238..89384cf 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -171,7 +171,7 @@ func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNod func (t *ShardWriteNode) SetInput(input *dag.Var) { t.InputStreams().EnsureSize(1) - input.Connect(t, 0) + input.StreamTo(t, 0) t.OutputValues().SetupNew(t, t.Graph().NewVar()) } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 31172c0..0f5473e 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -72,6 +72,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err t.Open.WithNullableLength(blkRange.Offset, blkRange.Length) } + // TODO2 支持HTTP协议 t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: f.Node, Address: *f.Node.Address.(*cdssdk.GRPCAddressInfo)}) t.Env().Pinned = true @@ -101,7 +102,17 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: n := ctx.DAG.NewShardWrite(t.FileHashStoreKey) - n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Hub}) + switch addr := t.Hub.Address.(type) { + // case *cdssdk.HttpAddressInfo: + // n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub}) + // TODO2 支持HTTP协议 + case *cdssdk.GRPCAddressInfo: + n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Hub, Address: *addr}) + + default: + return nil, fmt.Errorf("unsupported node address type %T", addr) + } + n.Env().Pinned = true return n, nil @@ -227,7 +238,7 @@ func generateRange(ctx *GenerateContext) { Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length, }) - toInput.Var.Disconnect(toNode, toInput.Index) + toInput.Var.StreamNotTo(toNode, toInput.Index) toNode.SetInput(rnged) } else { @@ -243,7 +254,7 @@ func generateRange(ctx *GenerateContext) { Offset: toRng.Offset - blkStart, Length: toRng.Length, }) - toInput.Var.Disconnect(toNode, toInput.Index) + toInput.Var.StreamNotTo(toNode, toInput.Index) toNode.SetInput(rnged) } } @@ -260,7 +271,7 @@ func generateClone(ctx *GenerateContext) { t := ctx.DAG.NewCloneStream() *t.Env() = *node.Env() for _, to := range out.To().RawArray() { - t.NewOutput().Connect(to.Node, to.SlotIndex) + t.NewOutput().StreamTo(to.Node, to.SlotIndex) } out.To().Resize(0) t.SetInput(out) @@ -274,7 +285,7 @@ func generateClone(ctx *GenerateContext) { t := ctx.DAG.NewCloneValue() *t.Env() = *node.Env() for _, to := range out.To().RawArray() { - t.NewOutput().Connect(to.Node, to.SlotIndex) + t.NewOutput().ValueTo(to.Node, to.SlotIndex) } out.To().Resize(0) t.SetInput(out) diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 11321ca..23dffdc 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -2,6 +2,7 @@ package local import ( "crypto/sha256" + "errors" "fmt" "io" "io/fs" @@ -18,9 +19,11 @@ import ( const ( TempDir = "tmp" BlocksDir = "blocks" + SvcName = "LocalShardStore" ) type ShardStore struct { + stg cdssdk.Storage cfg cdssdk.LocalShardStorage } @@ -31,26 +34,34 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor } return &ShardStore{ + stg: stg, cfg: cfg, }, nil } func (s *ShardStore) Start(ch *types.StorageEventChan) { - logger.Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) + s.getLogger().Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) } func (s *ShardStore) Stop() { - logger.Infof("local shard store stop") + s.getLogger().Infof("local shard store stop") } func (s *ShardStore) New() types.ShardWriter { - file, err := os.CreateTemp(filepath.Join(s.cfg.Root, "tmp"), "tmp-*") + tmpDir := filepath.Join(s.cfg.Root, TempDir) + + err := os.MkdirAll(tmpDir, 0755) + if err != nil { + return utils.ErrorShardWriter(err) + } + + file, err := os.CreateTemp(tmpDir, "tmp-*") if err != nil { return utils.ErrorShardWriter(err) } return &ShardWriter{ - path: filepath.Join(s.cfg.Root, "tmp", file.Name()), + path: file.Name(), // file.Name 包含tmpDir路径 file: file, hasher: sha256.New(), owner: s, @@ -90,6 +101,10 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { blockDir := filepath.Join(s.cfg.Root, BlocksDir) err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { return nil } @@ -108,7 +123,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) { }) return nil }) - if err != nil { + if err != nil && !errors.Is(err, os.ErrNotExist) { return nil, err } @@ -122,7 +137,7 @@ func (s *ShardStore) Purge(removes []cdssdk.FileHash) error { path := filepath.Join(s.cfg.Root, BlocksDir, fileName[:2], fileName) err := os.Remove(path) if err != nil { - logger.Warnf("remove file %v: %v", path, err) + s.getLogger().Warnf("remove file %v: %v", path, err) } } @@ -138,18 +153,20 @@ func (s *ShardStore) Stats() types.Stats { } func (s *ShardStore) onWritterAbort(w *ShardWriter) { - logger.Debugf("writting file %v aborted", w.path) + s.getLogger().Debugf("writting file %v aborted", w.path) s.removeTempFile(w.path) } func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { - logger.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash) + log := s.getLogger() + + log.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash) blockDir := filepath.Join(s.cfg.Root, BlocksDir, string(hash)[:2]) err := os.MkdirAll(blockDir, 0755) if err != nil { s.removeTempFile(w.path) - logger.Warnf("make block dir %v: %v", blockDir, err) + log.Warnf("make block dir %v: %v", blockDir, err) return types.FileInfo{}, fmt.Errorf("making block dir: %w", err) } @@ -157,7 +174,7 @@ func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (type err = os.Rename(w.path, name) if err != nil { s.removeTempFile(w.path) - logger.Warnf("rename %v to %v: %v", w.path, name, err) + log.Warnf("rename %v to %v: %v", w.path, name, err) return types.FileInfo{}, fmt.Errorf("rename file: %w", err) } @@ -171,6 +188,10 @@ func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (type func (s *ShardStore) removeTempFile(path string) { err := os.Remove(path) if err != nil { - logger.Warnf("removing temp file %v: %v", path, err) + s.getLogger().Warnf("removing temp file %v: %v", path, err) } } + +func (s *ShardStore) getLogger() logger.Logger { + return logger.WithField("Svc", SvcName).WithField("Storage", s.stg) +} diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index f066601..7499f03 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -51,6 +51,18 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge for _, hub := range masterHubs { masterHubMap[hub.NodeID] = hub } + for _, stg := range stgsMp { + if stg.Storage.MasterHub != 0 { + hub, ok := masterHubMap[stg.Storage.MasterHub] + if !ok { + logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage) + delete(stgsMp, stg.Storage.StorageID) + continue + } + + stg.MasterHub = &hub + } + } // 获取分片存储 shards, err := svc.db2.ShardStorage().BatchGetByStorageIDs(tx, msg.StorageIDs) @@ -58,14 +70,12 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge return fmt.Errorf("getting shard storage: %w", err) } for _, shard := range shards { - stgsMp[shard.StorageID].Shard = &shard - } - - for _, stg := range stgsMp { - if stg.Shard != nil { - hub := masterHubMap[stg.MasterHub.NodeID] - stg.MasterHub = &hub + stg := stgsMp[shard.StorageID] + if stg == nil { + continue } + + stg.Shard = &shard } // 获取共享存储的相关信息 @@ -74,7 +84,12 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge return fmt.Errorf("getting shared storage: %w", err) } for _, shared := range shareds { - stgsMp[shared.StorageID].Shared = &shared + stg := stgsMp[shared.StorageID] + if stg == nil { + continue + } + + stg.Shared = &shared } return nil @@ -118,6 +133,18 @@ func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*c for _, hub := range masterHubs { masterHubMap[hub.NodeID] = hub } + for _, stg := range stgsMp { + if stg.Storage.MasterHub != 0 { + hub, ok := masterHubMap[stg.Storage.MasterHub] + if !ok { + logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage) + delete(stgsMp, stg.Storage.StorageID) + continue + } + + stg.MasterHub = &hub + } + } stgIDs := lo.Map(stgs, func(stg cdssdk.Storage, i int) cdssdk.StorageID { return stg.StorageID }) @@ -127,13 +154,12 @@ func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*c return fmt.Errorf("getting shard storage: %w", err) } for _, shard := range shards { - stgsMp[shard.StorageID].Shard = &shard - } - for _, stg := range stgsMp { - if stg.Shard != nil { - hub := masterHubMap[stg.MasterHub.NodeID] - stg.MasterHub = &hub + stg := stgsMp[shard.StorageID] + if stg == nil { + continue } + + stg.Shard = &shard } // 获取共享存储的相关信息 @@ -142,7 +168,12 @@ func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*c return fmt.Errorf("getting shared storage: %w", err) } for _, shared := range shareds { - stgsMp[shared.StorageID].Shared = &shared + stg := stgsMp[shared.StorageID] + if stg == nil { + continue + } + + stg.Shared = &shared } return nil