diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index bf3b607..3708499 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -76,6 +76,8 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { if err != nil { return fmt.Errorf("creating ipfs file: %w", err) } + + ctx.packageStat.AddAccessCounter(t.packageID, *stgglb.Local.NodeID, 1) } _, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, *stgglb.Local.NodeID)) diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 28f26bc..eda7bd3 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -99,6 +99,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e if err != nil { return err } + ctx.packageStat.AddAccessCounter(t.packageID, *stgglb.Local.NodeID, 1) } _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks)) diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 397b59e..b174495 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -5,12 +5,14 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" ) type TaskContext struct { distlock *distlock.Service connectivity *connectivity.Collector downloader *downloader.Downloader + packageStat *packagestat.PackageStat } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -25,10 +27,11 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, packageStat *packagestat.PackageStat) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, downloader: downloader, + packageStat: packageStat, }) } diff --git a/agent/main.go b/agent/main.go index 3541bc8..314f781 100644 --- a/agent/main.go +++ b/agent/main.go @@ -4,9 +4,10 @@ import ( "fmt" "net" "os" + "time" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - log "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/agent/internal/config" "gitlink.org.cn/cloudream/storage/agent/internal/task" @@ -15,6 +16,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" "google.golang.org/grpc" @@ -38,7 +40,7 @@ func main() { os.Exit(1) } - err = log.Init(&config.Cfg().Logger) + err = logger.Init(&config.Cfg().Logger) if err != nil { fmt.Printf("init logger failed, err: %s", err.Error()) os.Exit(1) @@ -51,7 +53,7 @@ func main() { // 启动网络连通性检测,并就地检测一次 conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { - log := log.WithField("Connectivity", "") + log := logger.WithField("Connectivity", "") coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { @@ -84,25 +86,31 @@ func main() { }) conCol.CollectInPlace() + pkgStat := packagestat.NewPackageStat(packagestat.Config{ + // TODO 考虑放到配置里 + ReportInterval: time.Second * 10, + }) + go servePackageStat(pkgStat) + distlock, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { - log.Fatalf("new ipfs failed, err: %s", err.Error()) + logger.Fatalf("new ipfs failed, err: %s", err.Error()) } sw := exec.NewWorker() dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) - taskMgr := task.NewManager(distlock, &conCol, &dlder) + taskMgr := task.NewManager(distlock, &conCol, &dlder, pkgStat) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { - log.Fatalf("new agent server failed, err: %s", err.Error()) + logger.Fatalf("new agent server failed, err: %s", err.Error()) } agtSvr.OnError(func(err error) { - log.Warnf("agent server err: %s", err.Error()) + logger.Warnf("agent server err: %s", err.Error()) }) go serveAgentServer(agtSvr) @@ -110,7 +118,7 @@ func main() { listenAddr := config.Cfg().GRPC.MakeListenAddress() lis, err := net.Listen("tcp", listenAddr) if err != nil { - log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) + logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw)) @@ -123,45 +131,69 @@ func main() { } func serveAgentServer(server *agtmq.Server) { - log.Info("start serving command server") + logger.Info("start serving command server") err := server.Serve() if err != nil { - log.Errorf("command server stopped with error: %s", err.Error()) + logger.Errorf("command server stopped with error: %s", err.Error()) } - log.Info("command server stopped") + logger.Info("command server stopped") // TODO 仅简单结束了程序 os.Exit(1) } func serveGRPC(s *grpc.Server, lis net.Listener) { - log.Info("start serving grpc") + logger.Info("start serving grpc") err := s.Serve(lis) if err != nil { - log.Errorf("grpc stopped with error: %s", err.Error()) + logger.Errorf("grpc stopped with error: %s", err.Error()) } - log.Info("grpc stopped") + logger.Info("grpc stopped") // TODO 仅简单结束了程序 os.Exit(1) } func serveDistLock(svc *distlock.Service) { - log.Info("start serving distlock") + logger.Info("start serving distlock") err := svc.Serve() if err != nil { - log.Errorf("distlock stopped with error: %s", err.Error()) + logger.Errorf("distlock stopped with error: %s", err.Error()) } - log.Info("distlock stopped") + logger.Info("distlock stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} + +func servePackageStat(svc *packagestat.PackageStat) { + logger.Info("start serving package stat") + + ch := svc.Start() +loop: + for { + val, err := ch.Receive() + if err != nil { + logger.Errorf("package stat stopped with error: %v", err) + break + } + + switch val := val.(type) { + case error: + logger.Errorf("package stat stopped with error: %v", val) + break loop + } + } + logger.Info("package stat stopped") // TODO 仅简单结束了程序 os.Exit(1) diff --git a/client/internal/cmdline/getp.go b/client/internal/cmdline/getp.go index d98f26d..f053bf1 100644 --- a/client/internal/cmdline/getp.go +++ b/client/internal/cmdline/getp.go @@ -12,6 +12,7 @@ import ( "github.com/inhies/go-bytesize" "github.com/spf13/cobra" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -117,6 +118,9 @@ func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) { return fmt.Errorf("copy object data to local file failed, err: %w", err) } + if stgglb.Local.NodeID != nil { + cmdCtx.Cmdline.Svc.PackageStat.AddAccessCounter(id, *stgglb.Local.NodeID, 1) + } return nil }() if err != nil { diff --git a/client/internal/cmdline/scanner.go b/client/internal/cmdline/scanner.go index 0758003..ff3f55f 100644 --- a/client/internal/cmdline/scanner.go +++ b/client/internal/cmdline/scanner.go @@ -41,5 +41,7 @@ func init() { parseScannerEventCmdTrie.MustAdd(scevt.NewCleanPinned, reflect2.TypeNameOf[scevt.CleanPinned]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewUpdatePackageAccessStatAmount, reflect2.TypeNameOf[scevt.UpdatePackageAccessStatAmount]()) + commands.MustAdd(ScannerPostEvent, "scanner", "event") } diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 1fb7427..02f9e37 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -19,8 +19,8 @@ import ( func init() { rootCmd.AddCommand(&cobra.Command{ - Use: "test2", - Short: "test2", + Use: "test", + Short: "test", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { // cmdCtx := GetCmdCtx(cmd) @@ -160,8 +160,8 @@ func init() { }) rootCmd.AddCommand(&cobra.Command{ - Use: "test", - Short: "test", + Use: "test4", + Short: "test4", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { // cmdCtx := GetCmdCtx(cmd) @@ -202,8 +202,8 @@ func init() { }) rootCmd.AddCommand(&cobra.Command{ - Use: "test4", - Short: "test4", + Use: "test3", + Short: "test3", // Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { // cmdCtx := GetCmdCtx(cmd) @@ -228,7 +228,7 @@ func init() { ioswitchlrc.NewFromNode("QmQBKncEDqxw3BrGr3th3gS3jUC2fizGz1w29ZxxrrKfNv", &nodes.Nodes[0], 2), }, []ioswitchlrc.To{ ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], -1, "-1", exec.Range{0, &le}), - ioswitchlrc.NewToNode(nodes.Nodes[1], 0, "0"), + ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], 0, "0", exec.Range{10, &le}), ioswitchlrc.NewToNode(nodes.Nodes[1], 1, "1"), ioswitchlrc.NewToNode(nodes.Nodes[1], 2, "2"), ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"), diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 36105c0..0c402f1 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -14,6 +14,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myhttp "gitlink.org.cn/cloudream/common/utils/http" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) @@ -121,42 +122,51 @@ func (s *ObjectService) Download(ctx *gin.Context) { ctx.Writer.Header().Set("Content-Type", fmt.Sprintf("%s;boundary=%s", myhttp.ContentTypeMultiPart, mw.Boundary())) ctx.Writer.WriteHeader(http.StatusOK) + sendSize := int64(0) if req.PartSize == 0 { - err = sendFileOnePart(mw, "file", path.Base(file.Object.Path), file.File) + sendSize, err = sendFileOnePart(mw, "file", path.Base(file.Object.Path), file.File) } else { - err = sendFileMultiPart(mw, "file", path.Base(file.Object.Path), file.File, req.PartSize) + sendSize, err = sendFileMultiPart(mw, "file", path.Base(file.Object.Path), file.File, req.PartSize) } + + // TODO 当client不在某个代理节点上时如何处理? + if stgglb.Local.NodeID != nil { + s.svc.PackageStat.AddAccessCounter(file.Object.PackageID, *stgglb.Local.NodeID, float64(sendSize)/float64(file.Object.Size)) + } + if err != nil { log.Warnf("copying file: %s", err.Error()) } } -func sendFileMultiPart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser, partSize int64) error { +func sendFileMultiPart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser, partSize int64) (int64, error) { + total := int64(0) for { w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName)) if err != nil { - return fmt.Errorf("create form file failed, err: %w", err) + return 0, fmt.Errorf("create form file failed, err: %w", err) } n, err := io.Copy(w, io.LimitReader(file, partSize)) if err != nil { - return err + return total, err } if n == 0 { break } + total += n } - return nil + return total, nil } -func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) error { +func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) (int64, error) { w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName)) if err != nil { - return fmt.Errorf("create form file failed, err: %w", err) + return 0, fmt.Errorf("create form file failed, err: %w", err) } - _, err = io.Copy(w, file) - return err + n, err := io.Copy(w, file) + return n, err } func (s *ObjectService) UpdateInfo(ctx *gin.Context) { diff --git a/client/internal/services/service.go b/client/internal/services/service.go index 847c547..7e13bab 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -4,18 +4,21 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/client/internal/task" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" ) type Service struct { - DistLock *distlock.Service - TaskMgr *task.Manager - Downloader *downloader.Downloader + DistLock *distlock.Service + TaskMgr *task.Manager + Downloader *downloader.Downloader + PackageStat *packagestat.PackageStat } -func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader) (*Service, error) { +func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader, pkgStat *packagestat.PackageStat) (*Service, error) { return &Service{ - DistLock: distlock, - TaskMgr: taskMgr, - Downloader: downloader, + DistLock: distlock, + TaskMgr: taskMgr, + Downloader: downloader, + PackageStat: pkgStat, }, nil } diff --git a/client/main.go b/client/main.go index 47502ac..f1b9898 100644 --- a/client/main.go +++ b/client/main.go @@ -18,6 +18,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat" ) func main() { @@ -83,11 +84,17 @@ func main() { } go serveDistLock(distlockSvc) + pkgStat := packagestat.NewPackageStat(packagestat.Config{ + // TODO 考虑放到配置里 + ReportInterval: time.Second * 10, + }) + go servePackageStat(pkgStat) + taskMgr := task.NewManager(distlockSvc, &conCol) dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol) - svc, err := services.NewService(distlockSvc, &taskMgr, &dlder) + svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, pkgStat) if err != nil { logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) @@ -116,3 +123,27 @@ func serveDistLock(svc *distlock.Service) { // TODO 仅简单结束了程序 os.Exit(1) } + +func servePackageStat(svc *packagestat.PackageStat) { + logger.Info("start serving package stat") + + ch := svc.Start() +loop: + for { + val, err := ch.Receive() + if err != nil { + logger.Errorf("package stat stopped with error: %v", err) + break + } + + switch val := val.(type) { + case error: + logger.Errorf("package stat stopped with error: %v", val) + break loop + } + } + logger.Info("package stat stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} diff --git a/common/assets/confs/scanner.config.json b/common/assets/confs/scanner.config.json index f31c12a..f39720c 100644 --- a/common/assets/confs/scanner.config.json +++ b/common/assets/confs/scanner.config.json @@ -1,4 +1,5 @@ { + "accessStatHistoryAmount": 0.8, "ecFileSizeThreshold": 104857600, "nodeUnavailableSeconds": 300, "logger": { diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index f543931..b78ef70 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -114,7 +114,7 @@ create table Package ( PackageID int not null auto_increment primary key comment '包ID', Name varchar(100) not null comment '对象名', BucketID int not null comment '桶ID', - State varchar(100) not null comment '状态' + State varchar(100) not null comment '状态', ); create table Object ( @@ -160,11 +160,12 @@ create table StoragePackage ( primary key(StorageID, PackageID, UserID) ); -create table StoragePackageLog ( - StorageID int not null comment '存储服务ID', +create table PackageAccessStat ( PackageID int not null comment '包ID', - UserID int not null comment '调度了此文件的用户ID', - CreateTime timestamp not null comment '加载Package完成的时间' + NodeID int not null comment '节点ID', + Amount float not null comment '前一日流量的滑动平均值', + Counter float not null comment '本日的流量', + primary key(PackageID, NodeID) ); create table Location ( diff --git a/common/pkgs/db/model/model.go b/common/pkgs/db/model/model.go index 961f697..60a11ed 100644 --- a/common/pkgs/db/model/model.go +++ b/common/pkgs/db/model/model.go @@ -96,14 +96,14 @@ type StoragePackage struct { State string `db:"State" json:"state"` } -type StoragePackageLog struct { - StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"` - PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"` - UserID cdssdk.UserID `db:"UserID" json:"userID"` - CreateTime time.Time `db:"CreateTime" json:"createTime"` -} - type Location struct { LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"` Name string `db:"Name" json:"name"` } + +type PackageAccessStat struct { + PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"` + NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"` + Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 + Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 +} diff --git a/common/pkgs/db/package_access_stat.go b/common/pkgs/db/package_access_stat.go new file mode 100644 index 0000000..3a5cae0 --- /dev/null +++ b/common/pkgs/db/package_access_stat.go @@ -0,0 +1,61 @@ +package db + +import ( + "github.com/jmoiron/sqlx" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type PackageAccessStatDB struct { + *DB +} + +func (db *DB) PackageAccessStat() *PackageAccessStatDB { + return &PackageAccessStatDB{db} +} + +func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID) (model.PackageAccessStat, error) { + var ret model.PackageAccessStat + err := sqlx.Get(ctx, &ret, "select * from PackageAccessStat where PackageID=? and NodeID=?", pkgID, nodeID) + return ret, err +} + +func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]model.PackageAccessStat, error) { + var ret []model.PackageAccessStat + err := sqlx.Select(ctx, &ret, "select * from PackageAccessStat where PackageID=?", pkgID) + return ret, err +} + +func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddPackageAccessStatCounterEntry) error { + sql := "insert into PackageAccessStat(PackageID, NodeID, Counter, Amount) " + + "values(:PackageID, :NodeID, :Value, 0)" + + "on duplicate key update Counter=Counter+:Value" + err := BatchNamedExec(ctx, sql, 4, entries, nil) + return err +} + +func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error { + stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0 where PackageID in (?)", historyWeight, historyWeight, pkgIDs) + if err != nil { + return err + } + + _, err = ctx.Exec(stmt, args...) + return err +} + +func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error { + stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0", historyWeight, historyWeight) + if err != nil { + return err + } + + _, err = ctx.Exec(stmt, args...) + return err +} + +func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) error { + _, err := ctx.Exec("delete from PackageAccessStat where PackageID=?", pkgID) + return err +} diff --git a/common/pkgs/db/storage_package_log.go b/common/pkgs/db/storage_package_log.go deleted file mode 100644 index b882f30..0000000 --- a/common/pkgs/db/storage_package_log.go +++ /dev/null @@ -1,39 +0,0 @@ -package db - -import ( - "time" - - "github.com/jmoiron/sqlx" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" -) - -type StoragePackageLogDB struct { - *DB -} - -func (db *DB) StoragePackageLog() *StoragePackageLogDB { - return &StoragePackageLogDB{DB: db} -} - -func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackageLog, error) { - var ret model.StoragePackageLog - err := sqlx.Get(ctx, &ret, "select * from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) - return ret, err -} - -func (*StoragePackageLogDB) GetByPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]model.StoragePackageLog, error) { - var ret []model.StoragePackageLog - err := sqlx.Select(ctx, &ret, "select * from StoragePackageLog where PackageID = ?", packageID) - return ret, err -} - -func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) error { - _, err := ctx.Exec("insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime) - return err -} - -func (*StoragePackageLogDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error { - _, err := ctx.Exec("delete from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID) - return err -} diff --git a/common/pkgs/ioswitch2/ioswitch.go b/common/pkgs/ioswitch2/ioswitch.go deleted file mode 100644 index 48b43ee..0000000 --- a/common/pkgs/ioswitch2/ioswitch.go +++ /dev/null @@ -1,23 +0,0 @@ -package ioswitch2 - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" -) - -type NodeProps struct { - From From - To To -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type VarProps struct { - StreamIndex int // 流的编号,只在StreamVar上有意义 - ValueType ValueVarType // 值类型,只在ValueVar上有意义 - Var exec.Var // 生成Plan的时候创建的对应的Var -} diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index 0962e11..b4087b9 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" ) @@ -101,24 +100,54 @@ func (o *ChunkedJoin) String() string { ) } -type ChunkedSplitType struct { - OutputCount int - ChunkSize int +type ChunkedSplitNode struct { + dag.NodeBase + ChunkSize int } -func (t *ChunkedSplitType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{ - StreamIndex: i, - }) +func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { + node := &ChunkedSplitNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + return node +} + +func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputStreams().Resize(cnt) + for i := 0; i < cnt; i++ { + t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) + } +} + +func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { + return t.OutputStreams().Get(idx) +} + +func (t *ChunkedSplitNode) SplitCount() int { + return t.OutputStreams().Len() +} + +func (t *ChunkedSplitNode) Clear() { + if t.InputStreams().Len() == 0 { + return } + + t.InputStreams().Get(0).Disconnect(t, 0) + t.InputStreams().Resize(0) + + for _, out := range t.OutputStreams().RawArray() { + out.DisconnectAll() + } + t.OutputStreams().Resize(0) } -func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: op.InputStreams[0].Var, - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Input: t.InputStreams().Get(0).Var, + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.ChunkSize, @@ -126,30 +155,50 @@ func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { }, nil } -func (t *ChunkedSplitType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// func (t *ChunkedSplitNode) String() string { +// return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } + +type ChunkedJoinNode struct { + dag.NodeBase + ChunkSize int } -type ChunkedJoinType struct { - InputCount int - ChunkSize int +func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { + node := &ChunkedJoinNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) + return node } -func (t *ChunkedJoinType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) } -func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedJoinNode) Joined() *dag.StreamVar { + return t.OutputStreams().Get(0) +} + +func (t *ChunkedJoinNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) +} + +func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Output: op.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, ChunkSize: t.ChunkSize, }, nil } -func (t *ChunkedJoinType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) -} +// func (t *ChunkedJoinType) String() string { +// return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 45129a8..66d1194 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -74,48 +74,70 @@ func (o *CloneVar) String() string { return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) } -type CloneStreamType struct{} +type CloneStreamType struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { + node := &CloneStreamType{} + b.AddNode(node) + return node +} + +func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneStreamType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (t *CloneStreamType) NewOutput() *dag.StreamVar { + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output } -func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: op.InputStreams[0].Var, - Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Raw: t.InputStreams().Get(0).Var, + Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), }, nil } -func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar { - return dag.NodeNewOutputStream(node, nil) +// func (t *CloneStreamType) String() string { +// return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type CloneVarType struct { + dag.NodeBase } -func (t *CloneStreamType) String(node *dag.Node) string { - return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { + node := &CloneVarType{} + b.AddNode(node) + return node } -type CloneVarType struct{} +func (t *CloneVarType) SetInput(raw *dag.ValueVar) { + t.InputValues().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneVarType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) +func (t *CloneVarType) NewOutput() *dag.ValueVar { + output := t.Graph().NewValueVar(t.InputValues().Get(0).Type) + t.OutputValues().SetupNew(t, output) + return output } -func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: op.InputValues[0].Var, - Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var { + Raw: t.InputValues().Get(0).Var, + Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var { return v.Var }), }, nil } -func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { - return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil) -} - -func (t *CloneVarType) String(node *dag.Node) string { - return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *CloneVarType) String() string { +// return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 32f8163..246151b 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -14,7 +14,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "golang.org/x/sync/semaphore" ) @@ -204,15 +203,43 @@ func (o *ECMultiply) String() string { ) } -type MultiplyType struct { +type ECMultiplyNode struct { + dag.NodeBase EC cdssdk.ECRedundancy InputIndexes []int OutputIndexes []int } -func (t *MultiplyType) InitNode(node *dag.Node) {} +func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode { + node := &ECMultiplyNode{ + EC: ec, + } + b.AddNode(node) + return node +} + +func (t *ECMultiplyNode) AddInput(str *dag.StreamVar, dataIndex int) { + t.InputIndexes = append(t.InputIndexes, dataIndex) + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) +} + +func (t *ECMultiplyNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) + t.InputIndexes = nil +} + +func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar { + t.OutputIndexes = append(t.OutputIndexes, dataIndex) + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output +} -func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) { rs, err := ec.NewRs(t.EC.K, t.EC.N) if err != nil { return nil, err @@ -224,23 +251,12 @@ func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) { return &ECMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.EC.ChunkSize, }, nil } -func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) { - t.InputIndexes = append(t.InputIndexes, dataIndex) - node.InputStreams = append(node.InputStreams, str) - str.To(node, len(node.InputStreams)-1) -} - -func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { - t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex}) -} - -func (t *MultiplyType) String(node *dag.Node) string { - return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *MultiplyType) String() string { +// return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index b6bd994..33dba33 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -80,36 +79,66 @@ func (o *FileRead) String() string { return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output.ID) } -type FileReadType struct { +type FileReadNode struct { + dag.NodeBase FilePath string } -func (t *FileReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode { + node := &FileReadNode{ + FilePath: filePath, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *FileReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *FileReadType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *FileReadNode) GenerateOp() (exec.Op, error) { return &FileRead{ - Output: op.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, FilePath: t.FilePath, }, nil } -func (t *FileReadType) String(node *dag.Node) string { - return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) -} +// func (t *FileReadType) String() string { +// return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node)) +// } -type FileWriteType struct { +type FileWriteNode struct { + dag.NodeBase FilePath string } -func (t *FileWriteType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode { + node := &FileWriteNode{ + FilePath: filePath, + } + b.AddNode(node) + return node +} + +func (t *FileWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } +} + +func (t *FileWriteNode) SetInput(str *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + str.Connect(t, 0) } -func (t *FileWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *FileWriteNode) GenerateOp() (exec.Op, error) { return &FileWrite{ - Input: op.InputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, FilePath: t.FilePath, }, nil } diff --git a/common/pkgs/ioswitch2/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go index d6604e7..f8e4e2a 100644 --- a/common/pkgs/ioswitch2/ops2/ipfs.go +++ b/common/pkgs/ioswitch2/ops2/ipfs.go @@ -12,7 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -94,44 +93,78 @@ func (o *IPFSWrite) String() string { return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) } -type IPFSReadType struct { +type IPFSReadNode struct { + dag.NodeBase FileHash string Option ipfs.ReadOption } -func (t *IPFSReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode { + node := &IPFSReadNode{ + FileHash: fileHash, + Option: option, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *IPFSReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *IPFSReadNode) GenerateOp() (exec.Op, error) { return &IPFSRead{ - Output: n.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, FileHash: t.FileHash, Option: t.Option, }, nil } -func (t *IPFSReadType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSReadType) String() string { +// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +// } -type IPFSWriteType struct { +type IPFSWriteNode struct { + dag.NodeBase FileHashStoreKey string - Range exec.Range } -func (t *IPFSWriteType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { + node := &IPFSWriteNode{ + FileHashStoreKey: fileHashStoreKey, + } + b.AddNode(node) + return node +} + +func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) +} + +func (t *IPFSWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } } -func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { return &IPFSWrite{ - Input: op.InputStreams[0].Var, - FileHash: op.OutputValues[0].Var.(*exec.StringVar), + Input: t.InputStreams().Get(0).Var, + FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), }, nil } -func (t *IPFSWriteType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSWriteType) String() string { +// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/ops2/ops.go b/common/pkgs/ioswitch2/ops2/ops.go index c7309b6..c12ce53 100644 --- a/common/pkgs/ioswitch2/ops2/ops.go +++ b/common/pkgs/ioswitch2/ops2/ops.go @@ -1,75 +1,93 @@ package ops2 import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" ) -func formatStreamIO(node *dag.Node) string { - is := "" - for i, in := range node.InputStreams { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputStreams { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("S{%s>%s}", is, os) +type GraphNodeBuilder struct { + *ops.GraphNodeBuilder +} + +func NewGraphNodeBuilder() *GraphNodeBuilder { + return &GraphNodeBuilder{ops.NewGraphNodeBuilder()} +} + +type FromNode interface { + dag.Node + Output() dag.StreamSlot } -func formatValueIO(node *dag.Node) string { - is := "" - for i, in := range node.InputValues { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputValues { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("V{%s>%s}", is, os) +type ToNode interface { + dag.Node + Input() dag.StreamSlot + SetInput(input *dag.StreamVar) } + +// func formatStreamIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputStreams { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputStreams { +// if i > 0 +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("S{%s>%s}", is, os) +// } + +// func formatValueIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputValues { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputValues { +// if i > 0 { +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("V{%s>%s}", is, os) +// } diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 8e68238..1475b4e 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" ) func init() { @@ -76,24 +75,35 @@ func (o *Range) String() string { return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) } -type RangeType struct { +type RangeNode struct { + dag.NodeBase Range exec.Range } -func (t *RangeType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, &ioswitch2.VarProps{}) +func (b *GraphNodeBuilder) NewRange() *RangeNode { + node := &RangeNode{} + b.AddNode(node) + return node } -func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.Range = rng + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output +} + +func (t *RangeNode) GenerateOp() (exec.Op, error) { return &Range{ - Input: n.InputStreams[0].Var, - Output: n.OutputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).Var, Offset: t.Range.Offset, Length: t.Range.Length, }, nil } -func (t *RangeType) String(node *dag.Node) string { - return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *RangeType) String() string { +// return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 3eff6b5..21870c2 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -7,7 +7,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" @@ -26,16 +25,27 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { } } +type IndexedStream struct { + Stream *dag.StreamVar + DataIndex int +} + type ParseContext struct { Ft ioswitch2.FromTo - DAG *dag.Graph + DAG *ops2.GraphNodeBuilder // 为了产生所有To所需的数据范围,而需要From打开的范围。 // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 - StreamRange exec.Range + ToNodes map[ioswitch2.To]ops2.ToNode + IndexedStreams []IndexedStream + StreamRange exec.Range } func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error { - ctx := ParseContext{Ft: ft, DAG: dag.NewGraph()} + ctx := ParseContext{ + Ft: ft, + DAG: ops2.NewGraphNodeBuilder(), + ToNodes: make(map[ioswitch2.To]ops2.ToNode), + } // 分成两个阶段: // 1. 基于From和To生成更多指令,初步匹配to的需求 @@ -43,7 +53,7 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro // 计算一下打开流的范围 p.calcStreamRange(&ctx) - err := p.extend(&ctx, ft) + err := p.extend(&ctx) if err != nil { return err } @@ -82,20 +92,16 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro p.generateClone(&ctx) p.generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.StreamVar { var ret *dag.StreamVar - ctx.DAG.Walk(func(n *dag.Node) bool { - for _, o := range n.OutputStreams { - if o != nil && ioswitch2.SProps(o).StreamIndex == streamIndex { - ret = o - return false - } + for _, s := range ctx.IndexedStreams { + if s.DataIndex == streamIndex { + ret = s.Stream + break } - return true - }) - + } return ret } @@ -134,77 +140,86 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { ctx.StreamRange = rng } -func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch2.FromTo) error { - for _, fr := range ft.Froms { - frNode, err := p.buildFromNode(ctx, &ft, fr) +func (p *DefaultParser) extend(ctx *ParseContext) error { + for _, fr := range ctx.Ft.Froms { + frNode, err := p.buildFromNode(ctx, fr) if err != nil { return err } + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: frNode.Output().Var, + DataIndex: fr.GetDataIndex(), + }) + // 对于完整文件的From,生成Split指令 if fr.GetDataIndex() == -1 { - node, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{}) - frNode.OutputStreams[0].To(node, 0) + splitNode := ctx.DAG.NewChunkedSplit(p.EC.ChunkSize) + splitNode.Split(frNode.Output().Var, p.EC.K) + for i := 0; i < p.EC.K; i++ { + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: splitNode.SubStream(i), + DataIndex: i, + }) + } } } // 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令 ecInputStrs := make(map[int]*dag.StreamVar) -loop: - for _, o := range ctx.DAG.Nodes { - for _, s := range o.OutputStreams { - prop := ioswitch2.SProps(s) - if prop.StreamIndex >= 0 && ecInputStrs[prop.StreamIndex] == nil { - ecInputStrs[prop.StreamIndex] = s - if len(ecInputStrs) == p.EC.K { - break loop - } + for _, s := range ctx.IndexedStreams { + if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil { + ecInputStrs[s.DataIndex] = s.Stream + if len(ecInputStrs) == p.EC.K { + break } } } + if len(ecInputStrs) == p.EC.K { - mulNode, mulType := dag.NewNode(ctx.DAG, &ops2.MultiplyType{ - EC: p.EC, - }, &ioswitch2.NodeProps{}) + mulNode := ctx.DAG.NewECMultiply(p.EC) - for _, s := range ecInputStrs { - mulType.AddInput(mulNode, s, ioswitch2.SProps(s).StreamIndex) + for i, s := range ecInputStrs { + mulNode.AddInput(s, i) } for i := 0; i < p.EC.N; i++ { - mulType.NewOutput(mulNode, i) + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: mulNode.NewOutput(i), + DataIndex: i, + }) } - joinNode, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedJoinType{ - InputCount: p.EC.K, - ChunkSize: p.EC.ChunkSize, - }, &ioswitch2.NodeProps{}) - + joinNode := ctx.DAG.NewChunkedJoin(p.EC.ChunkSize) for i := 0; i < p.EC.K; i++ { // 不可能找不到流 - p.findOutputStream(ctx, i).To(joinNode, i) + joinNode.AddInput(p.findOutputStream(ctx, i)) } - ioswitch2.SProps(joinNode.OutputStreams[0]).StreamIndex = -1 + ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{ + Stream: joinNode.Joined(), + DataIndex: -1, + }) } // 为每一个To找到一个输入流 - for _, to := range ft.Toes { - n, err := p.buildToNode(ctx, &ft, to) + for _, to := range ctx.Ft.Toes { + toNode, err := p.buildToNode(ctx, to) if err != nil { return err } + ctx.ToNodes[to] = toNode str := p.findOutputStream(ctx, to.GetDataIndex()) if str == nil { return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex()) } - str.To(n, 0) + toNode.SetInput(str) } return nil } -func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f ioswitch2.From) (*dag.Node, error) { +func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) { var repRange exec.Range var blkRange exec.Range @@ -220,16 +235,10 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f switch f := f.(type) { case *ioswitch2.FromNode: - n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{ - FileHash: f.FileHash, - Option: ipfs.ReadOption{ - Offset: 0, - Length: -1, - }, - }, &ioswitch2.NodeProps{ - From: f, + t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{ + Offset: 0, + Length: -1, }) - ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { t.Option.Offset = repRange.Offset @@ -244,17 +253,16 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f } if f.Node != nil { - n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) - n.Env.Pinned = true + t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) + t.Env().Pinned = true } - return n, nil + return t, nil case *ioswitch2.FromDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch2.NodeProps{From: f}) - n.Env.ToEnvDriver() - n.Env.Pinned = true - ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + n := ctx.DAG.NewFromDriver(f.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -271,24 +279,19 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f } } -func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t ioswitch2.To) (*dag.Node, error) { +func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitch2.ToNode: - n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{ - FileHashStoreKey: t.FileHashStoreKey, - Range: t.Range, - }, &ioswitch2.NodeProps{ - To: t, - }) - n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node}) - n.Env.Pinned = true + n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey) + n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node}) + n.Env().Pinned = true return n, nil case *ioswitch2.ToDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch2.NodeProps{To: t}) - n.Env.ToEnvDriver() - n.Env.Pinned = true + n := ctx.DAG.NewToDriver(t.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true return n, nil @@ -301,15 +304,12 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t i func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.ChunkedJoinType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedJoinType) bool { - if len(node.OutputStreams[0].Toes) > 0 { + dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool { + if node.InputStreams().Len() > 0 { return true } - for _, in := range node.InputStreams { - in.NotTo(node) - } - + node.RemoveAllInputs() ctx.DAG.RemoveNode(node) return true }) @@ -320,25 +320,23 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { // 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令 func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.MultiplyType](ctx.DAG, func(node *dag.Node, typ *ops2.MultiplyType) bool { - for i2, out := range node.OutputStreams { - if len(out.Toes) > 0 { + dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool { + outArr := node.OutputStreams().RawArray() + for i2, out := range outArr { + if out.To().Len() > 0 { continue } - node.OutputStreams[i2] = nil - typ.OutputIndexes[i2] = -2 + outArr[i2] = nil + node.OutputIndexes[i2] = -2 changed = true } - node.OutputStreams = lo2.RemoveAllDefault(node.OutputStreams) - typ.OutputIndexes = lo2.RemoveAll(typ.OutputIndexes, -2) + node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr)) + node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2) // 如果所有输出流都被删除,则删除该指令 - if len(node.OutputStreams) == 0 { - for _, in := range node.InputStreams { - in.NotTo(node) - } - + if node.OutputStreams().Len() == 0 { + node.RemoveAllInputs() ctx.DAG.RemoveNode(node) changed = true } @@ -351,16 +349,16 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { // 删除未使用的Split指令 func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedSplitType) bool { + dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool { // Split出来的每一个流都没有被使用,才能删除这个指令 - for _, out := range node.OutputStreams { - if len(out.Toes) > 0 { + for _, out := range typ.OutputStreams().RawArray() { + if out.To().Len() > 0 { return true } } - node.InputStreams[0].NotTo(node) - ctx.DAG.RemoveNode(node) + typ.Clear() + ctx.DAG.RemoveNode(typ) changed = true return true }) @@ -372,43 +370,44 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { changed := false - dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(splitNode *dag.Node, typ *ops2.ChunkedSplitType) bool { + dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool { // Split指令的每一个输出都有且只有一个目的地 - var joinNode *dag.Node - for _, out := range splitNode.OutputStreams { - if len(out.Toes) != 1 { - continue + var dstNode dag.Node + for _, out := range splitNode.OutputStreams().RawArray() { + if out.To().Len() != 1 { + return true } - if joinNode == nil { - joinNode = out.Toes[0].Node - } else if joinNode != out.Toes[0].Node { + if dstNode == nil { + dstNode = out.To().Get(0).Node + } else if dstNode != out.To().Get(0).Node { return true } } - if joinNode == nil { + if dstNode == nil { return true } // 且这个目的地要是一个Join指令 - _, ok := joinNode.Type.(*ops2.ChunkedJoinType) + joinNode, ok := dstNode.(*ops2.ChunkedJoinNode) if !ok { return true } // 同时这个Join指令的输入也必须全部来自Split指令的输出。 // 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可 - if len(joinNode.InputStreams) != len(splitNode.OutputStreams) { + if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() { return true } // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T - splitNode.InputStreams[0].NotTo(splitNode) - for _, out := range joinNode.OutputStreams[0].Toes { - splitNode.InputStreams[0].To(out.Node, out.SlotIndex) + splitInput := splitNode.InputStreams().Get(0) + for _, to := range joinNode.Joined().To().RawArray() { + splitInput.Connect(to.Node, to.SlotIndex) } + splitInput.Disconnect(splitNode, 0) // 并删除这两个指令 ctx.DAG.RemoveNode(joinNode) @@ -426,21 +425,21 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool { // 所以理论上不会出现有指令的位置始终无法确定的情况。 func (p *DefaultParser) pin(ctx *ParseContext) bool { changed := false - ctx.DAG.Walk(func(node *dag.Node) bool { - if node.Env.Pinned { + ctx.DAG.Walk(func(node dag.Node) bool { + if node.Env().Pinned { return true } var toEnv *dag.NodeEnv - for _, out := range node.OutputStreams { - for _, to := range out.Toes { - if to.Node.Env.Type == dag.EnvUnknown { + for _, out := range node.OutputStreams().RawArray() { + for _, to := range out.To().RawArray() { + if to.Node.Env().Type == dag.EnvUnknown { continue } if toEnv == nil { - toEnv = &to.Node.Env - } else if !toEnv.Equals(to.Node.Env) { + toEnv = to.Node.Env() + } else if !toEnv.Equals(to.Node.Env()) { toEnv = nil break } @@ -448,35 +447,35 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool { } if toEnv != nil { - if !node.Env.Equals(*toEnv) { + if !node.Env().Equals(toEnv) { changed = true } - node.Env = *toEnv + *node.Env() = *toEnv return true } // 否则根据输入流的始发地来固定 var fromEnv *dag.NodeEnv - for _, in := range node.InputStreams { - if in.From.Node.Env.Type == dag.EnvUnknown { + for _, in := range node.InputStreams().RawArray() { + if in.From().Node.Env().Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = &in.From.Node.Env - } else if !fromEnv.Equals(in.From.Node.Env) { + fromEnv = in.From().Node.Env() + } else if !fromEnv.Equals(in.From().Node.Env()) { fromEnv = nil break } } if fromEnv != nil { - if !node.Env.Equals(*fromEnv) { + if !node.Env().Equals(fromEnv) { changed = true } - node.Env = *fromEnv + *node.Env() = *fromEnv } return true }) @@ -486,12 +485,12 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool { // 对于所有未使用的流,增加Drop指令 func (p *DefaultParser) dropUnused(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) == 0 { - n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitch2.NodeProps{}) - n.Env = node.Env - out.To(n, 0) + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() == 0 { + n := ctx.DAG.NewDropStream() + *n.Env() = *node.Env() + n.SetInput(out) } } return true @@ -500,43 +499,38 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { // 为IPFS写入指令存储结果 func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { - dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool { - if typ.FileHashStoreKey == "" { + dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool { + if n.FileHashStoreKey == "" { return true } - n, t := dag.NewNode(ctx.DAG, &ops.StoreType{ - StoreKey: typ.FileHashStoreKey, - }, &ioswitch2.NodeProps{}) - n.Env.ToEnvDriver() - t.Store(n, node.OutputValues[0]) + storeNode := ctx.DAG.NewStore() + storeNode.Env().ToEnvDriver() + + storeNode.Store(n.FileHashStoreKey, n.FileHashVar()) return true }) } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func (p *DefaultParser) generateRange(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - props := ioswitch2.NProps(node) - if props.To == nil { - return true - } + for i := 0; i < len(ctx.Ft.Toes); i++ { + to := ctx.Ft.Toes[i] + toNode := ctx.ToNodes[to] - toDataIdx := props.To.GetDataIndex() - toRng := props.To.GetRange() + toDataIdx := to.GetDataIndex() + toRng := to.GetRange() if toDataIdx == -1 { - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - ctx.StreamRange.Offset, - Length: toRng.Length, - }, - }, &ioswitch2.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - ctx.StreamRange.Offset, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } else { stripSize := int64(p.EC.ChunkSize * p.EC.K) @@ -544,54 +538,48 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) { blkStart := blkStartIdx * int64(p.EC.ChunkSize) - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - blkStart, - Length: toRng.Length, - }, - }, &ioswitch2.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - blkStart, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } - - return true - }) + } } // 生成Clone指令 func (p *DefaultParser) generateClone(ctx *ParseContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) <= 1 { + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch2.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - str := t.NewOutput(n) - str.Props = &ioswitch2.VarProps{StreamIndex: ioswitch2.SProps(out).StreamIndex} - str.To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneStream() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } - for _, out := range node.OutputValues { - if len(out.Toes) <= 1 { + for _, out := range node.OutputValues().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch2.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - t.NewOutput(n).To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneValue() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } return true diff --git a/common/pkgs/ioswitch2/utils.go b/common/pkgs/ioswitch2/utils.go deleted file mode 100644 index ad3cb20..0000000 --- a/common/pkgs/ioswitch2/utils.go +++ /dev/null @@ -1,17 +0,0 @@ -package ioswitch2 - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" -) - -func NProps(n *dag.Node) *NodeProps { - return dag.NProps[*NodeProps](n) -} - -func SProps(str *dag.StreamVar) *VarProps { - return dag.SProps[*VarProps](str) -} - -func VProps(v *dag.ValueVar) *VarProps { - return dag.VProps[*VarProps](v) -} diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/agent_worker.go index 81b280f..122a54a 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/agent_worker.go @@ -5,16 +5,14 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" ) -var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( - (*AgentWorker)(nil), -))) +// var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo]( +// (*AgentWorker)(nil), +// ))) type AgentWorker struct { Node cdssdk.Node diff --git a/common/pkgs/ioswitchlrc/ioswitch.go b/common/pkgs/ioswitchlrc/ioswitch.go deleted file mode 100644 index b6198a8..0000000 --- a/common/pkgs/ioswitchlrc/ioswitch.go +++ /dev/null @@ -1,23 +0,0 @@ -package ioswitchlrc - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" -) - -type NodeProps struct { - From From - To To -} - -type ValueVarType int - -const ( - StringValueVar ValueVarType = iota - SignalValueVar -) - -type VarProps struct { - StreamIndex int // 流的编号,只在StreamVar上有意义 - ValueType ValueVarType // 值类型,只在ValueVar上有意义 - Var exec.Var // 生成Plan的时候创建的对应的Var -} diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index 33ae0e6..9707c20 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -11,7 +11,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "golang.org/x/sync/semaphore" ) @@ -101,24 +100,40 @@ func (o *ChunkedJoin) String() string { ) } -type ChunkedSplitType struct { - OutputCount int - ChunkSize int +type ChunkedSplitNode struct { + dag.NodeBase + ChunkSize int } -func (t *ChunkedSplitType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - for i := 0; i < t.OutputCount; i++ { - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ - StreamIndex: i, - }) +func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode { + node := &ChunkedSplitNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + return node +} + +func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputStreams().Resize(cnt) + for i := 0; i < cnt; i++ { + t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i) } } -func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar { + return t.OutputStreams().Get(idx) +} + +func (t *ChunkedSplitNode) SplitCount() int { + return t.OutputStreams().Len() +} + +func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) { return &ChunkedSplit{ - Input: op.InputStreams[0].Var, - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Input: t.InputStreams().Get(0).Var, + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.ChunkSize, @@ -126,32 +141,43 @@ func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) { }, nil } -func (t *ChunkedSplitType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// func (t *ChunkedSplitNode) String() string { +// return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } + +type ChunkedJoinNode struct { + dag.NodeBase + ChunkSize int } -type ChunkedJoinType struct { - InputCount int - ChunkSize int +func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode { + node := &ChunkedJoinNode{ + ChunkSize: chunkSize, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar()) + return node } -func (t *ChunkedJoinType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, t.InputCount) - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ - StreamIndex: -1, - }) +func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) { + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) } -func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *ChunkedJoinNode) Joined() *dag.StreamVar { + return t.OutputStreams().Get(0) +} + +func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) { return &ChunkedJoin{ - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Output: op.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, ChunkSize: t.ChunkSize, }, nil } -func (t *ChunkedJoinType) String(node *dag.Node) string { - return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) -} +// func (t *ChunkedJoinType) String() string { +// return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 45129a8..66d1194 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -74,48 +74,70 @@ func (o *CloneVar) String() string { return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds)) } -type CloneStreamType struct{} +type CloneStreamType struct { + dag.NodeBase +} + +func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType { + node := &CloneStreamType{} + b.AddNode(node) + return node +} + +func (t *CloneStreamType) SetInput(raw *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneStreamType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) +func (t *CloneStreamType) NewOutput() *dag.StreamVar { + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output } -func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneStreamType) GenerateOp() (exec.Op, error) { return &CloneStream{ - Raw: op.InputStreams[0].Var, - Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { + Raw: t.InputStreams().Get(0).Var, + Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), }, nil } -func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar { - return dag.NodeNewOutputStream(node, nil) +// func (t *CloneStreamType) String() string { +// return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } + +type CloneVarType struct { + dag.NodeBase } -func (t *CloneStreamType) String(node *dag.Node) string { - return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node)) +func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType { + node := &CloneVarType{} + b.AddNode(node) + return node } -type CloneVarType struct{} +func (t *CloneVarType) SetInput(raw *dag.ValueVar) { + t.InputValues().EnsureSize(1) + raw.Connect(t, 0) +} -func (t *CloneVarType) InitNode(node *dag.Node) { - dag.NodeDeclareInputValue(node, 1) +func (t *CloneVarType) NewOutput() *dag.ValueVar { + output := t.Graph().NewValueVar(t.InputValues().Get(0).Type) + t.OutputValues().SetupNew(t, output) + return output } -func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *CloneVarType) GenerateOp() (exec.Op, error) { return &CloneVar{ - Raw: op.InputValues[0].Var, - Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var { + Raw: t.InputValues().Get(0).Var, + Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var { return v.Var }), }, nil } -func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar { - return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil) -} - -func (t *CloneVarType) String(node *dag.Node) string { - return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *CloneVarType) String() string { +// return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 01ed734..358a0e3 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -15,7 +15,6 @@ import ( "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -115,15 +114,43 @@ func (o *GalMultiply) String() string { ) } -type LRCConstructAnyType struct { +type LRCConstructAnyNode struct { + dag.NodeBase LRC cdssdk.LRCRedundancy InputIndexes []int OutputIndexes []int } -func (t *LRCConstructAnyType) InitNode(node *dag.Node) {} +func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCConstructAnyNode { + node := &LRCConstructAnyNode{ + LRC: lrc, + } + b.AddNode(node) + return node +} + +func (t *LRCConstructAnyNode) AddInput(str *dag.StreamVar, dataIndex int) { + t.InputIndexes = append(t.InputIndexes, dataIndex) + idx := t.InputStreams().EnlargeOne() + str.Connect(t, idx) +} + +func (t *LRCConstructAnyNode) RemoveAllInputs() { + for i, in := range t.InputStreams().RawArray() { + in.Disconnect(t, i) + } + t.InputStreams().Resize(0) + t.InputIndexes = nil +} -func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar { + t.OutputIndexes = append(t.OutputIndexes, dataIndex) + output := t.Graph().NewStreamVar() + t.OutputStreams().SetupNew(t, output) + return output +} + +func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) { l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) if err != nil { return nil, err @@ -135,50 +162,45 @@ func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.LRC.ChunkSize, }, nil } -func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) { - t.InputIndexes = append(t.InputIndexes, dataIndex) - node.InputStreams = append(node.InputStreams, str) - str.To(node, len(node.InputStreams)-1) -} +// func (t *LRCConstructAnyType) String() string { +// return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } -func (t *LRCConstructAnyType) RemoveAllInputs(n *dag.Node) { - for _, in := range n.InputStreams { - in.From.Node.OutputStreams[in.From.SlotIndex].NotTo(n) - } - n.InputStreams = nil - t.InputIndexes = nil -} - -func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar { - t.OutputIndexes = append(t.OutputIndexes, dataIndex) - return dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{StreamIndex: dataIndex}) +type LRCConstructGroupNode struct { + dag.NodeBase + LRC cdssdk.LRCRedundancy + TargetBlockIndex int } -func (t *LRCConstructAnyType) String(node *dag.Node) string { - return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node)) +func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc cdssdk.LRCRedundancy) *LRCConstructGroupNode { + node := &LRCConstructGroupNode{ + LRC: lrc, + } + b.AddNode(node) + return node } -type LRCConstructGroupType struct { - LRC cdssdk.LRCRedundancy - TargetBlockIndex int -} +func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.StreamVar) *dag.StreamVar { + t.TargetBlockIndex = blockIdx -func (t *LRCConstructGroupType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{ - StreamIndex: t.TargetBlockIndex, - }) + t.InputStreams().Resize(0) + for _, in := range inputs { + idx := t.InputStreams().EnlargeOne() + in.Connect(t, idx) + } - grpIdx := t.LRC.FindGroup(t.TargetBlockIndex) - dag.NodeDeclareInputStream(node, t.LRC.Groups[grpIdx]) + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output } -func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *LRCConstructGroupNode) GenerateOp() (exec.Op, error) { l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups) if err != nil { return nil, err @@ -190,12 +212,12 @@ func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) { return &GalMultiply{ Coef: coef, - Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), - Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), + Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }), ChunkSize: t.LRC.ChunkSize, }, nil } -func (t *LRCConstructGroupType) String(node *dag.Node) string { - return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node)) -} +// func (t *LRCConstructGroupType) String() string { +// return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/ipfs.go b/common/pkgs/ioswitchlrc/ops2/ipfs.go index 3ccdc3d..f8e4e2a 100644 --- a/common/pkgs/ioswitchlrc/ops2/ipfs.go +++ b/common/pkgs/ioswitchlrc/ops2/ipfs.go @@ -12,7 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -94,44 +93,78 @@ func (o *IPFSWrite) String() string { return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) } -type IPFSReadType struct { +type IPFSReadNode struct { + dag.NodeBase FileHash string Option ipfs.ReadOption } -func (t *IPFSReadType) InitNode(node *dag.Node) { - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode { + node := &IPFSReadNode{ + FileHash: fileHash, + Option: option, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *IPFSReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } } -func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *IPFSReadNode) GenerateOp() (exec.Op, error) { return &IPFSRead{ - Output: n.OutputStreams[0].Var, + Output: t.OutputStreams().Get(0).Var, FileHash: t.FileHash, Option: t.Option, }, nil } -func (t *IPFSReadType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSReadType) String() string { +// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +// } -type IPFSWriteType struct { +type IPFSWriteNode struct { + dag.NodeBase FileHashStoreKey string - Range exec.Range } -func (t *IPFSWriteType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitchlrc.VarProps{}) +func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { + node := &IPFSWriteNode{ + FileHashStoreKey: fileHashStoreKey, + } + b.AddNode(node) + return node +} + +func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) +} + +func (t *IPFSWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } } -func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) { +func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { return &IPFSWrite{ - Input: op.InputStreams[0].Var, - FileHash: op.OutputValues[0].Var.(*exec.StringVar), + Input: t.InputStreams().Get(0).Var, + FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), }, nil } -func (t *IPFSWriteType) String(node *dag.Node) string { - return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *IPFSWriteType) String() string { +// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/ops.go b/common/pkgs/ioswitchlrc/ops2/ops.go index c7309b6..a41ec08 100644 --- a/common/pkgs/ioswitchlrc/ops2/ops.go +++ b/common/pkgs/ioswitchlrc/ops2/ops.go @@ -1,75 +1,93 @@ package ops2 import ( - "fmt" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" ) -func formatStreamIO(node *dag.Node) string { - is := "" - for i, in := range node.InputStreams { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputStreams { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("S{%s>%s}", is, os) +type GraphNodeBuilder struct { + *ops.GraphNodeBuilder +} + +func NewGraphNodeBuilder() *GraphNodeBuilder { + return &GraphNodeBuilder{ops.NewGraphNodeBuilder()} +} + +type FromNode interface { + dag.Node + Output() dag.StreamSlot } -func formatValueIO(node *dag.Node) string { - is := "" - for i, in := range node.InputValues { - if i > 0 { - is += "," - } - - if in == nil { - is += "." - } else { - is += fmt.Sprintf("%v", in.ID) - } - } - - os := "" - for i, out := range node.OutputValues { - if i > 0 { - os += "," - } - - if out == nil { - os += "." - } else { - os += fmt.Sprintf("%v", out.ID) - } - } - - if is == "" && os == "" { - return "" - } - - return fmt.Sprintf("V{%s>%s}", is, os) +type ToNode interface { + dag.Node + Input() dag.StreamSlot + SetInput(input *dag.StreamVar) } + +// func formatStreamIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputStreams { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputStreams { +// if i > ops +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("S{%s>%s}", is, os) +// } + +// func formatValueIO(node *dag.Node) string { +// is := "" +// for i, in := range node.InputValues { +// if i > 0 { +// is += "," +// } + +// if in == nil { +// is += "." +// } else { +// is += fmt.Sprintf("%v", in.ID) +// } +// } + +// os := "" +// for i, out := range node.OutputValues { +// if i > 0 { +// os += "," +// } + +// if out == nil { +// os += "." +// } else { +// os += fmt.Sprintf("%v", out.ID) +// } +// } + +// if is == "" && os == "" { +// return "" +// } + +// return fmt.Sprintf("V{%s>%s}", is, os) +// } diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 5d97155..1475b4e 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" ) func init() { @@ -76,24 +75,35 @@ func (o *Range) String() string { return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID) } -type RangeType struct { +type RangeNode struct { + dag.NodeBase Range exec.Range } -func (t *RangeType) InitNode(node *dag.Node) { - dag.NodeDeclareInputStream(node, 1) - dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{}) +func (b *GraphNodeBuilder) NewRange() *RangeNode { + node := &RangeNode{} + b.AddNode(node) + return node } -func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) { +func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.Range = rng + output := t.Graph().NewStreamVar() + t.OutputStreams().Setup(t, output, 0) + return output +} + +func (t *RangeNode) GenerateOp() (exec.Op, error) { return &Range{ - Input: n.InputStreams[0].Var, - Output: n.OutputStreams[0].Var, + Input: t.InputStreams().Get(0).Var, + Output: t.OutputStreams().Get(0).Var, Offset: t.Range.Offset, Length: t.Range.Length, }, nil } -func (t *RangeType) String(node *dag.Node) string { - return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -} +// func (t *RangeType) String() string { +// return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/parser/generator.go b/common/pkgs/ioswitchlrc/parser/generator.go index cabe84b..33593a3 100644 --- a/common/pkgs/ioswitchlrc/parser/generator.go +++ b/common/pkgs/ioswitchlrc/parser/generator.go @@ -13,8 +13,9 @@ import ( type GenerateContext struct { LRC cdssdk.LRCRedundancy - DAG *dag.Graph - Toes []ioswitchlrc.To + DAG *ops2.GraphNodeBuilder + To []ioswitchlrc.To + ToNodes map[ioswitchlrc.To]ops2.ToNode StreamRange exec.Range } @@ -25,9 +26,10 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) } ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, - DAG: dag.NewGraph(), - Toes: toes, + LRC: cdssdk.DefaultLRCRedundancy, + DAG: ops2.NewGraphNodeBuilder(), + To: toes, + ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), } calcStreamRange(&ctx) @@ -46,7 +48,7 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) generateClone(&ctx) generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error { @@ -66,9 +68,9 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - frNode.OutputStreams[0].To(toNode, 0) - + toNode.SetInput(frNode.Output().Var) } else if idx < ctx.LRC.K { dataToes = append(dataToes, to) } else { @@ -81,19 +83,17 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr } // 需要文件块,则生成Split指令 - splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{ - OutputCount: ctx.LRC.K, - ChunkSize: ctx.LRC.ChunkSize, - }, &ioswitchlrc.NodeProps{}) - frNode.OutputStreams[0].To(splitNode, 0) + splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize) + splitNode.Split(frNode.Output().Var, ctx.LRC.K) for _, to := range dataToes { toNode, err := buildToNode(ctx, to) if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - splitNode.OutputStreams[to.GetDataIndex()].To(toNode, 0) + toNode.SetInput(splitNode.SubStream(to.GetDataIndex())) } if len(parityToes) == 0 { @@ -102,12 +102,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 需要校验块,则进一步生成Construct指令 - conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ - LRC: ctx.LRC, - }, &ioswitchlrc.NodeProps{}) + conType := ctx.DAG.NewLRCConstructAny(ctx.LRC) - for _, out := range splitNode.OutputStreams { - conType.AddInput(conNode, out, ioswitchlrc.SProps(out).StreamIndex) + for i, out := range splitNode.OutputStreams().RawArray() { + conType.AddInput(out, i) } for _, to := range parityToes { @@ -115,8 +113,9 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0) + toNode.SetInput(conType.NewOutput(to.GetDataIndex())) } return nil } @@ -124,9 +123,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr // 提供数据块+编码块中的k个块,重建任意块,包括完整文件。 func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, - DAG: dag.NewGraph(), - Toes: toes, + LRC: cdssdk.DefaultLRCRedundancy, + DAG: ops2.NewGraphNodeBuilder(), + To: toes, + ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), } calcStreamRange(&ctx) @@ -145,11 +145,11 @@ func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.P generateClone(&ctx) generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { - frNodes := make(map[int]*dag.Node) + frNodes := make(map[int]ops2.FromNode) for _, fr := range frs { frNode, err := buildFromNode(ctx, fr) if err != nil { @@ -167,12 +167,13 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ toIdx := to.GetDataIndex() fr := frNodes[toIdx] if fr != nil { - node, err := buildToNode(ctx, to) + toNode, err := buildToNode(ctx, to) if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - fr.OutputStreams[0].To(node, 0) + toNode.SetInput(fr.Output().Var) continue } @@ -189,12 +190,9 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 生成Construct指令来恢复缺少的块 - conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{ - LRC: ctx.LRC, - }, &ioswitchlrc.NodeProps{}) - - for _, fr := range frNodes { - conType.AddInput(conNode, fr.OutputStreams[0], ioswitchlrc.SProps(fr.OutputStreams[0]).StreamIndex) + conNode := ctx.DAG.NewLRCConstructAny(ctx.LRC) + for i, fr := range frNodes { + conNode.AddInput(fr.Output().Var, i) } for _, to := range missedToes { @@ -202,8 +200,9 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0) + toNode.SetInput(conNode.NewOutput(to.GetDataIndex())) } if len(completeToes) == 0 { @@ -212,17 +211,14 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 需要完整文件,则生成Join指令 - joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{ - InputCount: ctx.LRC.K, - ChunkSize: ctx.LRC.ChunkSize, - }, &ioswitchlrc.NodeProps{}) + joinNode := ctx.DAG.NewChunkedJoin(ctx.LRC.ChunkSize) for i := 0; i < ctx.LRC.K; i++ { - n := frNodes[i] - if n == nil { - conType.NewOutput(conNode, i).To(joinNode, i) + fr := frNodes[i] + if fr == nil { + joinNode.AddInput(conNode.NewOutput(i)) } else { - n.OutputStreams[0].To(joinNode, i) + joinNode.AddInput(fr.Output().Var) } } @@ -231,13 +227,14 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - joinNode.OutputStreams[0].To(toNode, 0) + toNode.SetInput(joinNode.Joined()) } // 如果不需要Construct任何块,则删除这个节点 - if len(conNode.OutputStreams) == 0 { - conType.RemoveAllInputs(conNode) + if conNode.OutputStreams().Len() == 0 { + conNode.RemoveAllInputs() ctx.DAG.RemoveNode(conNode) } @@ -247,9 +244,10 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [ // 输入同一组的多个块,恢复出剩下缺少的一个块。 func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error { ctx := GenerateContext{ - LRC: cdssdk.DefaultLRCRedundancy, - DAG: dag.NewGraph(), - Toes: toes, + LRC: cdssdk.DefaultLRCRedundancy, + DAG: ops2.NewGraphNodeBuilder(), + To: toes, + ToNodes: make(map[ioswitchlrc.To]ops2.ToNode), } calcStreamRange(&ctx) @@ -268,33 +266,33 @@ func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec generateClone(&ctx) generateRange(&ctx) - return plan.Generate(ctx.DAG, blder) + return plan.Generate(ctx.DAG.Graph, blder) } func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error { - missedGrpIdx := toes[0].GetDataIndex() - conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{ - LRC: ctx.LRC, - TargetBlockIndex: missedGrpIdx, - }, &ioswitchlrc.NodeProps{}) - - for i, fr := range frs { + var inputs []*dag.StreamVar + for _, fr := range frs { frNode, err := buildFromNode(ctx, fr) if err != nil { return fmt.Errorf("building from node: %w", err) } - frNode.OutputStreams[0].To(conNode, i) + inputs = append(inputs, frNode.Output().Var) } + missedGrpIdx := toes[0].GetDataIndex() + conNode := ctx.DAG.NewLRCConstructGroup(ctx.LRC) + missedBlk := conNode.SetupForTarget(missedGrpIdx, inputs) + for _, to := range toes { toNode, err := buildToNode(ctx, to) if err != nil { return fmt.Errorf("building to node: %w", err) } + ctx.ToNodes[to] = toNode - conNode.OutputStreams[0].To(toNode, 0) + toNode.SetInput(missedBlk) } return nil diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index d59f54f..4a5ba37 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -6,7 +6,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops" "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" @@ -21,7 +20,7 @@ func calcStreamRange(ctx *GenerateContext) { Offset: math.MaxInt64, } - for _, to := range ctx.Toes { + for _, to := range ctx.To { if to.GetDataIndex() == -1 { toRng := to.GetRange() rng.ExtendStart(math2.Floor(toRng.Offset, stripSize)) @@ -48,7 +47,7 @@ func calcStreamRange(ctx *GenerateContext) { ctx.StreamRange = rng } -func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) { +func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, error) { var repRange exec.Range var blkRange exec.Range @@ -64,16 +63,10 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) switch f := f.(type) { case *ioswitchlrc.FromNode: - n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{ - FileHash: f.FileHash, - Option: ipfs.ReadOption{ - Offset: 0, - Length: -1, - }, - }, &ioswitchlrc.NodeProps{ - From: f, + t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{ + Offset: 0, + Length: -1, }) - ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex if f.DataIndex == -1 { t.Option.Offset = repRange.Offset @@ -88,17 +81,16 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) } if f.Node != nil { - n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) - n.Env.Pinned = true + t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) + t.Env().Pinned = true } - return n, nil + return t, nil case *ioswitchlrc.FromDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f}) - n.Env.ToEnvDriver() - n.Env.Pinned = true - ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex + n := ctx.DAG.NewFromDriver(f.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true if f.DataIndex == -1 { f.Handle.RangeHint.Offset = repRange.Offset @@ -115,24 +107,19 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) } } -func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { +func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitchlrc.ToNode: - n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{ - FileHashStoreKey: t.FileHashStoreKey, - Range: t.Range, - }, &ioswitchlrc.NodeProps{ - To: t, - }) - n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node}) - n.Env.Pinned = true + n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey) + n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node}) + n.Env().Pinned = true return n, nil case *ioswitchlrc.ToDriver: - n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t}) - n.Env.ToEnvDriver() - n.Env.Pinned = true + n := ctx.DAG.NewToDriver(t.Handle) + n.Env().ToEnvDriver() + n.Env().Pinned = true return n, nil @@ -146,21 +133,21 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) { // 所以理论上不会出现有指令的位置始终无法确定的情况。 func pin(ctx *GenerateContext) bool { changed := false - ctx.DAG.Walk(func(node *dag.Node) bool { - if node.Env.Pinned { + ctx.DAG.Walk(func(node dag.Node) bool { + if node.Env().Pinned { return true } var toEnv *dag.NodeEnv - for _, out := range node.OutputStreams { - for _, to := range out.Toes { - if to.Node.Env.Type == dag.EnvUnknown { + for _, out := range node.OutputStreams().RawArray() { + for _, to := range out.To().RawArray() { + if to.Node.Env().Type == dag.EnvUnknown { continue } if toEnv == nil { - toEnv = &to.Node.Env - } else if !toEnv.Equals(to.Node.Env) { + toEnv = to.Node.Env() + } else if !toEnv.Equals(to.Node.Env()) { toEnv = nil break } @@ -168,35 +155,35 @@ func pin(ctx *GenerateContext) bool { } if toEnv != nil { - if !node.Env.Equals(*toEnv) { + if !node.Env().Equals(toEnv) { changed = true } - node.Env = *toEnv + *node.Env() = *toEnv return true } // 否则根据输入流的始发地来固定 var fromEnv *dag.NodeEnv - for _, in := range node.InputStreams { - if in.From.Node.Env.Type == dag.EnvUnknown { + for _, in := range node.InputStreams().RawArray() { + if in.From().Node.Env().Type == dag.EnvUnknown { continue } if fromEnv == nil { - fromEnv = &in.From.Node.Env - } else if !fromEnv.Equals(in.From.Node.Env) { + fromEnv = in.From().Node.Env() + } else if !fromEnv.Equals(in.From().Node.Env()) { fromEnv = nil break } } if fromEnv != nil { - if !node.Env.Equals(*fromEnv) { + if !node.Env().Equals(fromEnv) { changed = true } - node.Env = *fromEnv + *node.Env() = *fromEnv } return true }) @@ -206,12 +193,12 @@ func pin(ctx *GenerateContext) bool { // 对于所有未使用的流,增加Drop指令 func dropUnused(ctx *GenerateContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) == 0 { - n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitchlrc.NodeProps{}) - n.Env = node.Env - out.To(n, 0) + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() == 0 { + n := ctx.DAG.NewDropStream() + *n.Env() = *node.Env() + n.SetInput(out) } } return true @@ -220,44 +207,38 @@ func dropUnused(ctx *GenerateContext) { // 为IPFS写入指令存储结果 func storeIPFSWriteResult(ctx *GenerateContext) { - dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool { - if typ.FileHashStoreKey == "" { + dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool { + if n.FileHashStoreKey == "" { return true } - n, t := dag.NewNode(ctx.DAG, &ops.StoreType{ - StoreKey: typ.FileHashStoreKey, - }, &ioswitchlrc.NodeProps{}) - n.Env.ToEnvDriver() + storeNode := ctx.DAG.NewStore() + storeNode.Env().ToEnvDriver() - t.Store(n, node.OutputValues[0]) + storeNode.Store(n.FileHashStoreKey, n.FileHashVar()) return true }) } // 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 func generateRange(ctx *GenerateContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - props := ioswitchlrc.NProps(node) - if props.To == nil { - return true - } + for i := 0; i < len(ctx.To); i++ { + to := ctx.To[i] + toNode := ctx.ToNodes[to] - toDataIdx := props.To.GetDataIndex() - toRng := props.To.GetRange() + toDataIdx := to.GetDataIndex() + toRng := to.GetRange() if toDataIdx == -1 { - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - ctx.StreamRange.Offset, - Length: toRng.Length, - }, - }, &ioswitchlrc.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - ctx.StreamRange.Offset, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } else { stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K) @@ -265,54 +246,48 @@ func generateRange(ctx *GenerateContext) { blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize) - n := ctx.DAG.NewNode(&ops2.RangeType{ - Range: exec.Range{ - Offset: toRng.Offset - blkStart, - Length: toRng.Length, - }, - }, &ioswitchlrc.NodeProps{}) - n.Env = node.InputStreams[0].From.Node.Env - - node.InputStreams[0].To(n, 0) - node.InputStreams[0].NotTo(node) - n.OutputStreams[0].To(node, 0) + n := ctx.DAG.NewRange() + toInput := toNode.Input() + *n.Env() = *toInput.Var.From().Node.Env() + rnged := n.RangeStream(toInput.Var, exec.Range{ + Offset: toRng.Offset - blkStart, + Length: toRng.Length, + }) + toInput.Var.Disconnect(toNode, toInput.Index) + toNode.SetInput(rnged) } - - return true - }) + } } // 生成Clone指令 func generateClone(ctx *GenerateContext) { - ctx.DAG.Walk(func(node *dag.Node) bool { - for _, out := range node.OutputStreams { - if len(out.Toes) <= 1 { + ctx.DAG.Walk(func(node dag.Node) bool { + for _, out := range node.OutputStreams().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - str := t.NewOutput(n) - str.Props = &ioswitchlrc.VarProps{StreamIndex: ioswitchlrc.SProps(out).StreamIndex} - str.To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneStream() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } - for _, out := range node.OutputValues { - if len(out.Toes) <= 1 { + for _, out := range node.OutputValues().RawArray() { + if out.To().Len() <= 1 { continue } - n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitchlrc.NodeProps{}) - n.Env = node.Env - for _, to := range out.Toes { - t.NewOutput(node).To(to.Node, to.SlotIndex) + t := ctx.DAG.NewCloneValue() + *t.Env() = *node.Env() + for _, to := range out.To().RawArray() { + t.NewOutput().Connect(to.Node, to.SlotIndex) } - out.Toes = nil - out.To(n, 0) + out.To().Resize(0) + t.SetInput(out) } return true diff --git a/common/pkgs/ioswitchlrc/utils.go b/common/pkgs/ioswitchlrc/utils.go deleted file mode 100644 index aa162ee..0000000 --- a/common/pkgs/ioswitchlrc/utils.go +++ /dev/null @@ -1,17 +0,0 @@ -package ioswitchlrc - -import ( - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" -) - -func NProps(n *dag.Node) *NodeProps { - return dag.NProps[*NodeProps](n) -} - -func SProps(str *dag.StreamVar) *VarProps { - return dag.SProps[*VarProps](str) -} - -func VProps(v *dag.ValueVar) *VarProps { - return dag.VProps[*VarProps](v) -} diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index d80cc37..b4820ca 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -23,6 +23,8 @@ type PackageService interface { GetPackageCachedNodes(msg *GetPackageCachedNodes) (*GetPackageCachedNodesResp, *mq.CodeMessage) GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, *mq.CodeMessage) + + AddPackageAccessStatCounter(msg *AddPackageAccessStatCounter) (*AddPackageAccessStatCounterResp, *mq.CodeMessage) } // 获取Package基本信息 @@ -254,3 +256,34 @@ func NewGetPackageLoadedNodesResp(nodeIDs []cdssdk.NodeID) *GetPackageLoadedNode func (client *Client) GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, error) { return mq.Request(Service.GetPackageLoadedNodes, client.rabbitCli, msg) } + +// 更新Pacakge访问统计中的计数值 +var _ = Register(Service.AddPackageAccessStatCounter) + +type AddPackageAccessStatCounter struct { + mq.MessageBodyBase + Entries []AddPackageAccessStatCounterEntry `json:"entries"` +} +type AddPackageAccessStatCounterEntry struct { + PackageID cdssdk.PackageID `json:"packageID" db:"PackageID"` + NodeID cdssdk.NodeID `json:"nodeID" db:"NodeID"` + Value float64 `json:"value" db:"Value"` +} + +type AddPackageAccessStatCounterResp struct { + mq.MessageBodyBase +} + +func NewAddPackageAccessStatCounter(entries []AddPackageAccessStatCounterEntry) *AddPackageAccessStatCounter { + return &AddPackageAccessStatCounter{ + Entries: entries, + } +} + +func NewAddPackageAccessStatCounterResp() *AddPackageAccessStatCounterResp { + return &AddPackageAccessStatCounterResp{} +} + +func (client *Client) AddPackageAccessStatCounter(msg *AddPackageAccessStatCounter) (*AddPackageAccessStatCounterResp, error) { + return mq.Request(Service.AddPackageAccessStatCounter, client.rabbitCli, msg) +} diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index a35a953..bc6bfac 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -1,8 +1,6 @@ package coordinator import ( - "time" - "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -15,8 +13,6 @@ type StorageService interface { GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage) - - GetPackageLoadLogDetails(msg *GetPackageLoadLogDetails) (*GetPackageLoadLogDetailsResp, *mq.CodeMessage) } // 获取Storage信息 @@ -102,34 +98,3 @@ func NewStoragePackageLoadedResp() *StoragePackageLoadedResp { func (client *Client) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, error) { return mq.Request(Service.StoragePackageLoaded, client.rabbitCli, msg) } - -// 查询Package的导入记录 -var _ = Register(Service.GetPackageLoadLogDetails) - -type GetPackageLoadLogDetails struct { - mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` -} -type GetPackageLoadLogDetailsResp struct { - mq.MessageBodyBase - Logs []PackageLoadLogDetail `json:"logs"` -} -type PackageLoadLogDetail struct { - Storage model.Storage `json:"storage"` - UserID cdssdk.UserID `json:"userID"` - CreateTime time.Time `json:"createTime"` -} - -func ReqGetPackageLoadLogDetails(packageID cdssdk.PackageID) *GetPackageLoadLogDetails { - return &GetPackageLoadLogDetails{ - PackageID: packageID, - } -} -func RespGetPackageLoadLogDetails(logs []PackageLoadLogDetail) *GetPackageLoadLogDetailsResp { - return &GetPackageLoadLogDetailsResp{ - Logs: logs, - } -} -func (client *Client) GetPackageLoadLogDetails(msg *GetPackageLoadLogDetails) (*GetPackageLoadLogDetailsResp, error) { - return mq.Request(Service.GetPackageLoadLogDetails, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/scanner/event/update_package_access_stat_amount.go b/common/pkgs/mq/scanner/event/update_package_access_stat_amount.go new file mode 100644 index 0000000..e7fade6 --- /dev/null +++ b/common/pkgs/mq/scanner/event/update_package_access_stat_amount.go @@ -0,0 +1,18 @@ +package event + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type UpdatePackageAccessStatAmount struct { + EventBase + PackageIDs []cdssdk.PackageID `json:"packageIDs"` +} + +func NewUpdatePackageAccessStatAmount(packageIDs []cdssdk.PackageID) *UpdatePackageAccessStatAmount { + return &UpdatePackageAccessStatAmount{ + PackageIDs: packageIDs, + } +} + +func init() { + Register[*UpdatePackageAccessStatAmount]() +} diff --git a/common/pkgs/package_stat/config.go b/common/pkgs/package_stat/config.go new file mode 100644 index 0000000..8d3f6d8 --- /dev/null +++ b/common/pkgs/package_stat/config.go @@ -0,0 +1,7 @@ +package packagestat + +import "time" + +type Config struct { + ReportInterval time.Duration +} diff --git a/common/pkgs/package_stat/package_stat.go b/common/pkgs/package_stat/package_stat.go new file mode 100644 index 0000000..c430325 --- /dev/null +++ b/common/pkgs/package_stat/package_stat.go @@ -0,0 +1,98 @@ +package packagestat + +import ( + "fmt" + "sync" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sync2" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +type PackageStatEvent interface{} + +type amountKey struct { + PackageID cdssdk.PackageID + NodeID cdssdk.NodeID +} + +type amount struct { + Counter float64 +} + +type PackageStat struct { + cfg Config + amounts map[amountKey]*amount + lock sync.Mutex +} + +func NewPackageStat(cfg Config) *PackageStat { + return &PackageStat{ + cfg: cfg, + amounts: make(map[amountKey]*amount), + } +} + +func (p *PackageStat) AddAccessCounter(pkgID cdssdk.PackageID, nodeID cdssdk.NodeID, value float64) { + p.lock.Lock() + defer p.lock.Unlock() + + key := amountKey{ + PackageID: pkgID, + NodeID: nodeID, + } + if _, ok := p.amounts[key]; !ok { + p.amounts[key] = &amount{} + } + p.amounts[key].Counter += value +} + +func (p *PackageStat) Start() *sync2.UnboundChannel[PackageStatEvent] { + ch := sync2.NewUnboundChannel[PackageStatEvent]() + + go func() { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + ch.Send(fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + ticker := time.NewTicker(p.cfg.ReportInterval) + for { + <-ticker.C + + p.lock.Lock() + amts := p.amounts + p.amounts = make(map[amountKey]*amount) + + var addEntries []coormq.AddPackageAccessStatCounterEntry + for key, amount := range amts { + addEntries = append(addEntries, coormq.AddPackageAccessStatCounterEntry{ + PackageID: key.PackageID, + NodeID: key.NodeID, + Value: amount.Counter, + }) + } + p.lock.Unlock() + + _, err := coorCli.AddPackageAccessStatCounter(coormq.NewAddPackageAccessStatCounter(addEntries)) + if err != nil { + logger.Errorf("add all package access stat counter: %v", err) + + p.lock.Lock() + for key, a := range amts { + if _, ok := p.amounts[key]; !ok { + p.amounts[key] = &amount{} + } + p.amounts[key].Counter += a.Counter + } + p.lock.Unlock() + continue + } + } + }() + return ch +} diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index b83c9d1..9ed43c9 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -10,7 +10,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -129,6 +128,13 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack Warnf("deleting unused package: %w", err.Error()) } + err = svc.db.PackageAccessStat().DeleteByPackageID(tx, msg.PackageID) + if err != nil { + logger.WithField("UserID", msg.UserID). + WithField("PackageID", msg.PackageID). + Warnf("deleting package access stat: %w", err.Error()) + } + return nil }) if err != nil { @@ -215,36 +221,13 @@ func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*c return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) } -func (svc *Service) GetPackageLoadLogDetails(msg *coormq.GetPackageLoadLogDetails) (*coormq.GetPackageLoadLogDetailsResp, *mq.CodeMessage) { - var logs []coormq.PackageLoadLogDetail - rawLogs, err := svc.db.StoragePackageLog().GetByPackageID(svc.db.SQLCtx(), msg.PackageID) +func (svc *Service) AddPackageAccessStatCounter(msg *coormq.AddPackageAccessStatCounter) (*coormq.AddPackageAccessStatCounterResp, *mq.CodeMessage) { + err := svc.db.PackageAccessStat().BatchAddCounter(svc.db.SQLCtx(), msg.Entries) if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("getting storage package log: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get storage package log failed") - } - - stgs := make(map[cdssdk.StorageID]model.Storage) - - for _, raw := range rawLogs { - stg, ok := stgs[raw.StorageID] - if !ok { - stg, err = svc.db.Storage().GetByID(svc.db.SQLCtx(), raw.StorageID) - if err != nil { - logger.WithField("PackageID", msg.PackageID). - Warnf("getting storage: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get storage failed") - } - - stgs[raw.StorageID] = stg - } - - logs = append(logs, coormq.PackageLoadLogDetail{ - Storage: stg, - UserID: raw.UserID, - CreateTime: raw.CreateTime, - }) + errMsg := fmt.Sprintf("batch add package access stat counter: %s", err.Error()) + logger.Error(errMsg) + return nil, mq.Failed(errorcode.OperationFailed, errMsg) } - return mq.ReplyOK(coormq.RespGetPackageLoadLogDetails(logs)) + return mq.ReplyOK(coormq.NewAddPackageAccessStatCounterResp()) } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 3556b3f..4bf759d 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -3,7 +3,6 @@ package mq import ( "database/sql" "fmt" - "time" "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" @@ -54,11 +53,6 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo return fmt.Errorf("creating storage package: %w", err) } - err = svc.db.StoragePackageLog().Create(tx, msg.StorageID, msg.PackageID, msg.UserID, time.Now()) - if err != nil { - return fmt.Errorf("creating storage package log: %w", err) - } - stg, err := svc.db.Storage().GetByID(tx, msg.StorageID) if err != nil { return fmt.Errorf("getting storage: %w", err) diff --git a/scanner/internal/config/config.go b/scanner/internal/config/config.go index 353e2e4..2b35bec 100644 --- a/scanner/internal/config/config.go +++ b/scanner/internal/config/config.go @@ -9,12 +9,13 @@ import ( ) type Config struct { - ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` - NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 - Logger log.Config `json:"logger"` - DB db.Config `json:"db"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` + AccessStatHistoryAmount float64 `json:"accessStatHistoryAmount"` + ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` + NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 + Logger log.Config `json:"logger"` + DB db.Config `json:"db"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 92e42d0..84cf5c7 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -40,9 +40,8 @@ func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageR } type NodeLoadInfo struct { - Node cdssdk.Node - LoadsRecentMonth int - LoadsRecentYear int + Node cdssdk.Node + AccessAmount float64 } func (t *CheckPackageRedundancy) TryMerge(other Event) bool { @@ -62,6 +61,8 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { log.Debugf("end, time: %v", time.Since(startTime)) }() + // TODO 应该像其他event一样直接读取数据库 + coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { log.Warnf("new coordinator client: %s", err.Error()) @@ -75,9 +76,9 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { return } - getLogs, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID)) + stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.SQLCtx(), t.PackageID) if err != nil { - log.Warnf("getting package load log details: %s", err.Error()) + log.Warnf("getting package access stats: %s", err.Error()) return } @@ -100,18 +101,12 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } } - for _, log := range getLogs.Logs { - info, ok := userAllNodes[log.Storage.NodeID] + for _, stat := range stats { + info, ok := userAllNodes[stat.NodeID] if !ok { continue } - - sinceNow := time.Since(log.CreateTime) - if sinceNow.Hours() < monthHours { - info.LoadsRecentMonth++ - } else if sinceNow.Hours() < yearHours { - info.LoadsRecentYear++ - } + info.AccessAmount = stat.Amount } var changedObjects []coormq.UpdatingObjectRedundancy @@ -215,8 +210,11 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllNodes map[cdssdk.NodeID]*NodeLoadInfo) (cdssdk.Redundancy, []*NodeLoadInfo) { switch obj.Object.Redundancy.(type) { case *cdssdk.NoneRedundancy: - newLRCNodes := t.chooseNewNodesForLRC(&cdssdk.DefaultLRCRedundancy, userAllNodes) - return &cdssdk.DefaultLRCRedundancy, newLRCNodes + newNodes := t.chooseNewNodesForEC(&cdssdk.DefaultECRedundancy, userAllNodes) + return &cdssdk.DefaultECRedundancy, newNodes + + // newLRCNodes := t.chooseNewNodesForLRC(&cdssdk.DefaultLRCRedundancy, userAllNodes) + // return &cdssdk.DefaultLRCRedundancy, newLRCNodes case *cdssdk.LRCRedundancy: newLRCNodes := t.rechooseNodesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllNodes) @@ -263,12 +261,7 @@ func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.Object func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { - dm := right.LoadsRecentMonth - left.LoadsRecentMonth - if dm != 0 { - return dm - } - - return right.LoadsRecentYear - left.LoadsRecentYear + return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyNodes(red.RepCount, sortedNodes) @@ -276,12 +269,7 @@ func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { - dm := right.LoadsRecentMonth - left.LoadsRecentMonth - if dm != 0 { - return dm - } - - return right.LoadsRecentYear - left.LoadsRecentYear + return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyNodes(red.N, sortedNodes) @@ -289,12 +277,7 @@ func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, a func (t *CheckPackageRedundancy) chooseNewNodesForLRC(red *cdssdk.LRCRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { - dm := right.LoadsRecentMonth - left.LoadsRecentMonth - if dm != 0 { - return dm - } - - return right.LoadsRecentYear - left.LoadsRecentYear + return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyNodes(red.N, sortedNodes) @@ -323,18 +306,13 @@ func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.N } sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { - dm := right.LoadsRecentMonth - left.LoadsRecentMonth - if dm != 0 { - return dm - } - // 已经缓存了文件块的节点优先选择 v := sort2.CmpBool(right.HasBlock, left.HasBlock) if v != 0 { return v } - return right.LoadsRecentYear - left.LoadsRecentYear + return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) return t.chooseSoManyNodes(red.RepCount, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo })) @@ -363,18 +341,13 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red } sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { - dm := right.LoadsRecentMonth - left.LoadsRecentMonth - if dm != 0 { - return dm - } - // 已经缓存了文件块的节点优先选择 v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) if v != 0 { return v } - return right.LoadsRecentYear - left.LoadsRecentYear + return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 @@ -404,18 +377,13 @@ func (t *CheckPackageRedundancy) rechooseNodesForLRC(obj stgmod.ObjectDetail, re } sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { - dm := right.LoadsRecentMonth - left.LoadsRecentMonth - if dm != 0 { - return dm - } - // 已经缓存了文件块的节点优先选择 v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) if v != 0 { return v } - return right.LoadsRecentYear - left.LoadsRecentYear + return sort2.Cmp(right.AccessAmount, left.AccessAmount) }) // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 4cb52cb..df2ceba 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -54,6 +54,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { log.Debugf("end, time: %v", time.Since(startTime)) }() + // TODO 应该与其他event一样,直接访问数据库 coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { log.Warnf("new coordinator client: %s", err.Error()) @@ -67,12 +68,18 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { return } - getLoadLog, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID)) + stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.SQLCtx(), t.PackageID) if err != nil { - log.Warnf("getting package load log details: %s", err.Error()) + log.Warnf("getting package access stat: %s", err.Error()) return } - readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID }) + var readerNodeIDs []cdssdk.NodeID + for _, item := range stats { + // TODO 可以考虑做成配置 + if item.Amount >= float64(len(getObjs.Objects)/2) { + readerNodeIDs = append(readerNodeIDs, item.NodeID) + } + } // 注意!需要保证allNodeID包含所有之后可能用到的节点ID // TOOD 可以考虑设计Cache机制 diff --git a/scanner/internal/event/update_package_access_stat_amount.go b/scanner/internal/event/update_package_access_stat_amount.go new file mode 100644 index 0000000..b2346dd --- /dev/null +++ b/scanner/internal/event/update_package_access_stat_amount.go @@ -0,0 +1,68 @@ +package event + +import ( + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/logger" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + "gitlink.org.cn/cloudream/storage/scanner/internal/config" +) + +type UpdatePackageAccessStatAmount struct { + *scevt.UpdatePackageAccessStatAmount +} + +func NewUpdatePackageAccessStatAmount(evt *scevt.UpdatePackageAccessStatAmount) *UpdatePackageAccessStatAmount { + return &UpdatePackageAccessStatAmount{ + UpdatePackageAccessStatAmount: evt, + } +} + +func (t *UpdatePackageAccessStatAmount) TryMerge(other Event) bool { + event, ok := other.(*UpdatePackageAccessStatAmount) + if !ok { + return false + } + + if t.PackageIDs == nil { + return true + } + + if event.PackageIDs == nil { + t.PackageIDs = nil + return true + } + + t.PackageIDs = append(t.PackageIDs, event.PackageIDs...) + t.PackageIDs = lo.Uniq(t.PackageIDs) + return true +} + +func (t *UpdatePackageAccessStatAmount) Execute(execCtx ExecuteContext) { + log := logger.WithType[AgentCacheGC]("Event") + startTime := time.Now() + log.Debugf("begin with %v", logger.FormatStruct(t.UpdatePackageAccessStatAmount)) + defer func() { + log.Debugf("end, time: %v", time.Since(startTime)) + }() + + if t.PackageIDs == nil { + err := execCtx.Args.DB.PackageAccessStat().UpdateAllAmount(execCtx.Args.DB.SQLCtx(), config.Cfg().AccessStatHistoryAmount) + if err != nil { + log.Warnf("update all package access stat amount: %v", err) + return + } + + } else { + err := execCtx.Args.DB.PackageAccessStat().BatchUpdateAmount(execCtx.Args.DB.SQLCtx(), t.PackageIDs, config.Cfg().AccessStatHistoryAmount) + if err != nil { + log.Warnf("batch update package access stat amount: %v", err) + return + } + } +} + +func init() { + RegisterMessageConvertor(NewUpdatePackageAccessStatAmount) +} diff --git a/scanner/internal/tickevent/update_all_package_access_stat_amount.go b/scanner/internal/tickevent/update_all_package_access_stat_amount.go new file mode 100644 index 0000000..7b724eb --- /dev/null +++ b/scanner/internal/tickevent/update_all_package_access_stat_amount.go @@ -0,0 +1,32 @@ +package tickevent + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + evt "gitlink.org.cn/cloudream/storage/scanner/internal/event" +) + +type UpdateAllPackageAccessStatAmount struct { + todayUpdated bool +} + +func NewUpdateAllPackageAccessStatAmount() *UpdateAllPackageAccessStatAmount { + return &UpdateAllPackageAccessStatAmount{} +} + +func (e *UpdateAllPackageAccessStatAmount) Execute(ctx ExecuteContext) { + log := logger.WithType[UpdateAllPackageAccessStatAmount]("TickEvent") + log.Debugf("begin") + defer log.Debugf("end") + + nowHour := time.Now().Hour() + if nowHour != 0 { + e.todayUpdated = false + return + } + e.todayUpdated = true + + ctx.Args.EventExecutor.Post(evt.NewUpdatePackageAccessStatAmount(event.NewUpdatePackageAccessStatAmount(nil))) +} diff --git a/scanner/main.go b/scanner/main.go index f17e269..064430b 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -130,4 +130,6 @@ func startTickEvent(tickExecutor *tickevent.Executor) { tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) + + tickExecutor.Start(tickevent.NewUpdateAllPackageAccessStatAmount(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) }