Browse Source

Merge pull request '增加Package访问量统计机制;ioswitch代码优化' (#32) from feature_gxh into master

gitlink
Sydonian 1 year ago
parent
commit
4a802f4c8b
49 changed files with 1514 additions and 1039 deletions
  1. +2
    -0
      agent/internal/task/cache_move_package.go
  2. +1
    -0
      agent/internal/task/storage_load_package.go
  3. +4
    -1
      agent/internal/task/task.go
  4. +49
    -17
      agent/main.go
  5. +4
    -0
      client/internal/cmdline/getp.go
  6. +2
    -0
      client/internal/cmdline/scanner.go
  7. +7
    -7
      client/internal/cmdline/test.go
  8. +20
    -10
      client/internal/http/object.go
  9. +10
    -7
      client/internal/services/service.go
  10. +32
    -1
      client/main.go
  11. +1
    -0
      common/assets/confs/scanner.config.json
  12. +6
    -5
      common/assets/scripts/create_database.sql
  13. +7
    -7
      common/pkgs/db/model/model.go
  14. +61
    -0
      common/pkgs/db/package_access_stat.go
  15. +0
    -39
      common/pkgs/db/storage_package_log.go
  16. +0
    -23
      common/pkgs/ioswitch2/ioswitch.go
  17. +76
    -27
      common/pkgs/ioswitch2/ops2/chunked.go
  18. +45
    -23
      common/pkgs/ioswitch2/ops2/clone.go
  19. +36
    -20
      common/pkgs/ioswitch2/ops2/ec.go
  20. +43
    -14
      common/pkgs/ioswitch2/ops2/file.go
  21. +53
    -20
      common/pkgs/ioswitch2/ops2/ipfs.go
  22. +84
    -66
      common/pkgs/ioswitch2/ops2/ops.go
  23. +21
    -11
      common/pkgs/ioswitch2/ops2/range.go
  24. +178
    -190
      common/pkgs/ioswitch2/parser/parser.go
  25. +0
    -17
      common/pkgs/ioswitch2/utils.go
  26. +3
    -5
      common/pkgs/ioswitchlrc/agent_worker.go
  27. +0
    -23
      common/pkgs/ioswitchlrc/ioswitch.go
  28. +55
    -29
      common/pkgs/ioswitchlrc/ops2/chunked.go
  29. +45
    -23
      common/pkgs/ioswitchlrc/ops2/clone.go
  30. +62
    -40
      common/pkgs/ioswitchlrc/ops2/ec.go
  31. +53
    -20
      common/pkgs/ioswitchlrc/ops2/ipfs.go
  32. +84
    -66
      common/pkgs/ioswitchlrc/ops2/ops.go
  33. +21
    -11
      common/pkgs/ioswitchlrc/ops2/range.go
  34. +56
    -58
      common/pkgs/ioswitchlrc/parser/generator.go
  35. +85
    -110
      common/pkgs/ioswitchlrc/parser/passes.go
  36. +0
    -17
      common/pkgs/ioswitchlrc/utils.go
  37. +33
    -0
      common/pkgs/mq/coordinator/package.go
  38. +0
    -35
      common/pkgs/mq/coordinator/storage.go
  39. +18
    -0
      common/pkgs/mq/scanner/event/update_package_access_stat_amount.go
  40. +7
    -0
      common/pkgs/package_stat/config.go
  41. +98
    -0
      common/pkgs/package_stat/package_stat.go
  42. +13
    -30
      coordinator/internal/mq/package.go
  43. +0
    -6
      coordinator/internal/mq/storage.go
  44. +7
    -6
      scanner/internal/config/config.go
  45. +20
    -52
      scanner/internal/event/check_package_redundancy.go
  46. +10
    -3
      scanner/internal/event/clean_pinned.go
  47. +68
    -0
      scanner/internal/event/update_package_access_stat_amount.go
  48. +32
    -0
      scanner/internal/tickevent/update_all_package_access_stat_amount.go
  49. +2
    -0
      scanner/main.go

+ 2
- 0
agent/internal/task/cache_move_package.go View File

@@ -76,6 +76,8 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
if err != nil {
return fmt.Errorf("creating ipfs file: %w", err)
}

ctx.packageStat.AddAccessCounter(t.packageID, *stgglb.Local.NodeID, 1)
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(t.packageID, *stgglb.Local.NodeID))


+ 1
- 0
agent/internal/task/storage_load_package.go View File

@@ -99,6 +99,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
if err != nil {
return err
}
ctx.packageStat.AddAccessCounter(t.packageID, *stgglb.Local.NodeID, 1)
}

_, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks))


+ 4
- 1
agent/internal/task/task.go View File

@@ -5,12 +5,14 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat"
)

type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
downloader *downloader.Downloader
packageStat *packagestat.PackageStat
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -25,10 +27,11 @@ type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, packageStat *packagestat.PackageStat) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
packageStat: packageStat,
})
}

+ 49
- 17
agent/main.go View File

@@ -4,9 +4,10 @@ import (
"fmt"
"net"
"os"
"time"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
@@ -15,6 +16,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat"

"google.golang.org/grpc"

@@ -38,7 +40,7 @@ func main() {
os.Exit(1)
}

err = log.Init(&config.Cfg().Logger)
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
@@ -51,7 +53,7 @@ func main() {

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) {
log := log.WithField("Connectivity", "")
log := logger.WithField("Connectivity", "")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
@@ -84,25 +86,31 @@ func main() {
})
conCol.CollectInPlace()

pkgStat := packagestat.NewPackageStat(packagestat.Config{
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
})
go servePackageStat(pkgStat)

distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
logger.Fatalf("new ipfs failed, err: %s", err.Error())
}

sw := exec.NewWorker()

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

taskMgr := task.NewManager(distlock, &conCol, &dlder)
taskMgr := task.NewManager(distlock, &conCol, &dlder, pkgStat)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error())
logger.Fatalf("new agent server failed, err: %s", err.Error())
}
agtSvr.OnError(func(err error) {
log.Warnf("agent server err: %s", err.Error())
logger.Warnf("agent server err: %s", err.Error())
})
go serveAgentServer(agtSvr)

@@ -110,7 +118,7 @@ func main() {
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw))
@@ -123,45 +131,69 @@ func main() {
}

func serveAgentServer(server *agtmq.Server) {
log.Info("start serving command server")
logger.Info("start serving command server")

err := server.Serve()

if err != nil {
log.Errorf("command server stopped with error: %s", err.Error())
logger.Errorf("command server stopped with error: %s", err.Error())
}

log.Info("command server stopped")
logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveGRPC(s *grpc.Server, lis net.Listener) {
log.Info("start serving grpc")
logger.Info("start serving grpc")

err := s.Serve(lis)

if err != nil {
log.Errorf("grpc stopped with error: %s", err.Error())
logger.Errorf("grpc stopped with error: %s", err.Error())
}

log.Info("grpc stopped")
logger.Info("grpc stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveDistLock(svc *distlock.Service) {
log.Info("start serving distlock")
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
log.Errorf("distlock stopped with error: %s", err.Error())
logger.Errorf("distlock stopped with error: %s", err.Error())
}

log.Info("distlock stopped")
logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func servePackageStat(svc *packagestat.PackageStat) {
logger.Info("start serving package stat")

ch := svc.Start()
loop:
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("package stat stopped with error: %v", err)
break
}

switch val := val.(type) {
case error:
logger.Errorf("package stat stopped with error: %v", val)
break loop
}
}
logger.Info("package stat stopped")

// TODO 仅简单结束了程序
os.Exit(1)


+ 4
- 0
client/internal/cmdline/getp.go View File

@@ -12,6 +12,7 @@ import (
"github.com/inhies/go-bytesize"
"github.com/spf13/cobra"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
)

@@ -117,6 +118,9 @@ func getpByID(cmdCtx *CommandContext, id cdssdk.PackageID, output string) {
return fmt.Errorf("copy object data to local file failed, err: %w", err)
}

if stgglb.Local.NodeID != nil {
cmdCtx.Cmdline.Svc.PackageStat.AddAccessCounter(id, *stgglb.Local.NodeID, 1)
}
return nil
}()
if err != nil {


+ 2
- 0
client/internal/cmdline/scanner.go View File

@@ -41,5 +41,7 @@ func init() {

parseScannerEventCmdTrie.MustAdd(scevt.NewCleanPinned, reflect2.TypeNameOf[scevt.CleanPinned]())

parseScannerEventCmdTrie.MustAdd(scevt.NewUpdatePackageAccessStatAmount, reflect2.TypeNameOf[scevt.UpdatePackageAccessStatAmount]())

commands.MustAdd(ScannerPostEvent, "scanner", "event")
}

+ 7
- 7
client/internal/cmdline/test.go View File

@@ -19,8 +19,8 @@ import (

func init() {
rootCmd.AddCommand(&cobra.Command{
Use: "test2",
Short: "test2",
Use: "test",
Short: "test",
// Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
// cmdCtx := GetCmdCtx(cmd)
@@ -160,8 +160,8 @@ func init() {
})

rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
Use: "test4",
Short: "test4",
// Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
// cmdCtx := GetCmdCtx(cmd)
@@ -202,8 +202,8 @@ func init() {
})

rootCmd.AddCommand(&cobra.Command{
Use: "test4",
Short: "test4",
Use: "test3",
Short: "test3",
// Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
// cmdCtx := GetCmdCtx(cmd)
@@ -228,7 +228,7 @@ func init() {
ioswitchlrc.NewFromNode("QmQBKncEDqxw3BrGr3th3gS3jUC2fizGz1w29ZxxrrKfNv", &nodes.Nodes[0], 2),
}, []ioswitchlrc.To{
ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], -1, "-1", exec.Range{0, &le}),
ioswitchlrc.NewToNode(nodes.Nodes[1], 0, "0"),
ioswitchlrc.NewToNodeWithRange(nodes.Nodes[1], 0, "0", exec.Range{10, &le}),
ioswitchlrc.NewToNode(nodes.Nodes[1], 1, "1"),
ioswitchlrc.NewToNode(nodes.Nodes[1], 2, "2"),
ioswitchlrc.NewToNode(nodes.Nodes[1], 3, "3"),


+ 20
- 10
client/internal/http/object.go View File

@@ -14,6 +14,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
)

@@ -121,42 +122,51 @@ func (s *ObjectService) Download(ctx *gin.Context) {
ctx.Writer.Header().Set("Content-Type", fmt.Sprintf("%s;boundary=%s", myhttp.ContentTypeMultiPart, mw.Boundary()))
ctx.Writer.WriteHeader(http.StatusOK)

sendSize := int64(0)
if req.PartSize == 0 {
err = sendFileOnePart(mw, "file", path.Base(file.Object.Path), file.File)
sendSize, err = sendFileOnePart(mw, "file", path.Base(file.Object.Path), file.File)
} else {
err = sendFileMultiPart(mw, "file", path.Base(file.Object.Path), file.File, req.PartSize)
sendSize, err = sendFileMultiPart(mw, "file", path.Base(file.Object.Path), file.File, req.PartSize)
}

// TODO 当client不在某个代理节点上时如何处理?
if stgglb.Local.NodeID != nil {
s.svc.PackageStat.AddAccessCounter(file.Object.PackageID, *stgglb.Local.NodeID, float64(sendSize)/float64(file.Object.Size))
}

if err != nil {
log.Warnf("copying file: %s", err.Error())
}
}

func sendFileMultiPart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser, partSize int64) error {
func sendFileMultiPart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser, partSize int64) (int64, error) {
total := int64(0)
for {
w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName))
if err != nil {
return fmt.Errorf("create form file failed, err: %w", err)
return 0, fmt.Errorf("create form file failed, err: %w", err)
}

n, err := io.Copy(w, io.LimitReader(file, partSize))
if err != nil {
return err
return total, err
}
if n == 0 {
break
}
total += n
}
return nil
return total, nil
}

func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) error {
func sendFileOnePart(muWriter *multipart.Writer, fieldName, fileName string, file io.ReadCloser) (int64, error) {
w, err := muWriter.CreateFormFile(fieldName, ul.PathEscape(fileName))
if err != nil {
return fmt.Errorf("create form file failed, err: %w", err)
return 0, fmt.Errorf("create form file failed, err: %w", err)
}

_, err = io.Copy(w, file)
return err
n, err := io.Copy(w, file)
return n, err
}

func (s *ObjectService) UpdateInfo(ctx *gin.Context) {


+ 10
- 7
client/internal/services/service.go View File

@@ -4,18 +4,21 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/client/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat"
)

type Service struct {
DistLock *distlock.Service
TaskMgr *task.Manager
Downloader *downloader.Downloader
DistLock *distlock.Service
TaskMgr *task.Manager
Downloader *downloader.Downloader
PackageStat *packagestat.PackageStat
}

func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader) (*Service, error) {
func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader, pkgStat *packagestat.PackageStat) (*Service, error) {
return &Service{
DistLock: distlock,
TaskMgr: taskMgr,
Downloader: downloader,
DistLock: distlock,
TaskMgr: taskMgr,
Downloader: downloader,
PackageStat: pkgStat,
}, nil
}

+ 32
- 1
client/main.go View File

@@ -18,6 +18,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
packagestat "gitlink.org.cn/cloudream/storage/common/pkgs/package_stat"
)

func main() {
@@ -83,11 +84,17 @@ func main() {
}
go serveDistLock(distlockSvc)

pkgStat := packagestat.NewPackageStat(packagestat.Config{
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
})
go servePackageStat(pkgStat)

taskMgr := task.NewManager(distlockSvc, &conCol)

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

svc, err := services.NewService(distlockSvc, &taskMgr, &dlder)
svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, pkgStat)
if err != nil {
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
@@ -116,3 +123,27 @@ func serveDistLock(svc *distlock.Service) {
// TODO 仅简单结束了程序
os.Exit(1)
}

func servePackageStat(svc *packagestat.PackageStat) {
logger.Info("start serving package stat")

ch := svc.Start()
loop:
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("package stat stopped with error: %v", err)
break
}

switch val := val.(type) {
case error:
logger.Errorf("package stat stopped with error: %v", val)
break loop
}
}
logger.Info("package stat stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

+ 1
- 0
common/assets/confs/scanner.config.json View File

@@ -1,4 +1,5 @@
{
"accessStatHistoryAmount": 0.8,
"ecFileSizeThreshold": 104857600,
"nodeUnavailableSeconds": 300,
"logger": {


+ 6
- 5
common/assets/scripts/create_database.sql View File

@@ -114,7 +114,7 @@ create table Package (
PackageID int not null auto_increment primary key comment '包ID',
Name varchar(100) not null comment '对象名',
BucketID int not null comment '桶ID',
State varchar(100) not null comment '状态'
State varchar(100) not null comment '状态',
);

create table Object (
@@ -160,11 +160,12 @@ create table StoragePackage (
primary key(StorageID, PackageID, UserID)
);

create table StoragePackageLog (
StorageID int not null comment '存储服务ID',
create table PackageAccessStat (
PackageID int not null comment '包ID',
UserID int not null comment '调度了此文件的用户ID',
CreateTime timestamp not null comment '加载Package完成的时间'
NodeID int not null comment '节点ID',
Amount float not null comment '前一日流量的滑动平均值',
Counter float not null comment '本日的流量',
primary key(PackageID, NodeID)
);

create table Location (


+ 7
- 7
common/pkgs/db/model/model.go View File

@@ -96,14 +96,14 @@ type StoragePackage struct {
State string `db:"State" json:"state"`
}

type StoragePackageLog struct {
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"`
UserID cdssdk.UserID `db:"UserID" json:"userID"`
CreateTime time.Time `db:"CreateTime" json:"createTime"`
}

type Location struct {
LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"`
Name string `db:"Name" json:"name"`
}

type PackageAccessStat struct {
PackageID cdssdk.PackageID `db:"PackageID" json:"packageID"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值
Counter float64 `db:"Counter" json:"counter"` // 当日的读取量
}

+ 61
- 0
common/pkgs/db/package_access_stat.go View File

@@ -0,0 +1,61 @@
package db

import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type PackageAccessStatDB struct {
*DB
}

func (db *DB) PackageAccessStat() *PackageAccessStatDB {
return &PackageAccessStatDB{db}
}

func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, nodeID cdssdk.NodeID) (model.PackageAccessStat, error) {
var ret model.PackageAccessStat
err := sqlx.Get(ctx, &ret, "select * from PackageAccessStat where PackageID=? and NodeID=?", pkgID, nodeID)
return ret, err
}

func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]model.PackageAccessStat, error) {
var ret []model.PackageAccessStat
err := sqlx.Select(ctx, &ret, "select * from PackageAccessStat where PackageID=?", pkgID)
return ret, err
}

func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddPackageAccessStatCounterEntry) error {
sql := "insert into PackageAccessStat(PackageID, NodeID, Counter, Amount) " +
"values(:PackageID, :NodeID, :Value, 0)" +
"on duplicate key update Counter=Counter+:Value"
err := BatchNamedExec(ctx, sql, 4, entries, nil)
return err
}

func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0 where PackageID in (?)", historyWeight, historyWeight, pkgIDs)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0", historyWeight, historyWeight)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) error {
_, err := ctx.Exec("delete from PackageAccessStat where PackageID=?", pkgID)
return err
}

+ 0
- 39
common/pkgs/db/storage_package_log.go View File

@@ -1,39 +0,0 @@
package db

import (
"time"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type StoragePackageLogDB struct {
*DB
}

func (db *DB) StoragePackageLog() *StoragePackageLogDB {
return &StoragePackageLogDB{DB: db}
}

func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackageLog, error) {
var ret model.StoragePackageLog
err := sqlx.Get(ctx, &ret, "select * from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID)
return ret, err
}

func (*StoragePackageLogDB) GetByPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]model.StoragePackageLog, error) {
var ret []model.StoragePackageLog
err := sqlx.Select(ctx, &ret, "select * from StoragePackageLog where PackageID = ?", packageID)
return ret, err
}

func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) error {
_, err := ctx.Exec("insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime)
return err
}

func (*StoragePackageLogDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("delete from StoragePackageLog where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID)
return err
}

+ 0
- 23
common/pkgs/ioswitch2/ioswitch.go View File

@@ -1,23 +0,0 @@
package ioswitch2

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeProps struct {
From From
To To
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type VarProps struct {
StreamIndex int // 流的编号,只在StreamVar上有意义
ValueType ValueVarType // 值类型,只在ValueVar上有意义
Var exec.Var // 生成Plan的时候创建的对应的Var
}

+ 76
- 27
common/pkgs/ioswitch2/ops2/chunked.go View File

@@ -11,7 +11,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"golang.org/x/sync/semaphore"
)

@@ -101,24 +100,54 @@ func (o *ChunkedJoin) String() string {
)
}

type ChunkedSplitType struct {
OutputCount int
ChunkSize int
type ChunkedSplitNode struct {
dag.NodeBase
ChunkSize int
}

func (t *ChunkedSplitType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
for i := 0; i < t.OutputCount; i++ {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{
StreamIndex: i,
})
func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode {
node := &ChunkedSplitNode{
ChunkSize: chunkSize,
}
b.AddNode(node)
return node
}

func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
t.OutputStreams().Resize(cnt)
for i := 0; i < cnt; i++ {
t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i)
}
}

func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar {
return t.OutputStreams().Get(idx)
}

func (t *ChunkedSplitNode) SplitCount() int {
return t.OutputStreams().Len()
}

func (t *ChunkedSplitNode) Clear() {
if t.InputStreams().Len() == 0 {
return
}

t.InputStreams().Get(0).Disconnect(t, 0)
t.InputStreams().Resize(0)

for _, out := range t.OutputStreams().RawArray() {
out.DisconnectAll()
}
t.OutputStreams().Resize(0)
}

func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) {
return &ChunkedSplit{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Input: t.InputStreams().Get(0).Var,
Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
ChunkSize: t.ChunkSize,
@@ -126,30 +155,50 @@ func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
}, nil
}

func (t *ChunkedSplitType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
// func (t *ChunkedSplitNode) String() string {
// return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
// }

type ChunkedJoinNode struct {
dag.NodeBase
ChunkSize int
}

type ChunkedJoinType struct {
InputCount int
ChunkSize int
func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode {
node := &ChunkedJoinNode{
ChunkSize: chunkSize,
}
b.AddNode(node)
node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar())
return node
}

func (t *ChunkedJoinType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, t.InputCount)
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) {
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
}

func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *ChunkedJoinNode) Joined() *dag.StreamVar {
return t.OutputStreams().Get(0)
}

func (t *ChunkedJoinNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
}
t.InputStreams().Resize(0)
}

func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) {
return &ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
Output: op.OutputStreams[0].Var,
Output: t.OutputStreams().Get(0).Var,
ChunkSize: t.ChunkSize,
}, nil
}

func (t *ChunkedJoinType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}
// func (t *ChunkedJoinType) String() string {
// return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
// }

+ 45
- 23
common/pkgs/ioswitch2/ops2/clone.go View File

@@ -74,48 +74,70 @@ func (o *CloneVar) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds))
}

type CloneStreamType struct{}
type CloneStreamType struct {
dag.NodeBase
}

func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType {
node := &CloneStreamType{}
b.AddNode(node)
return node
}

func (t *CloneStreamType) SetInput(raw *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
raw.Connect(t, 0)
}

func (t *CloneStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
func (t *CloneStreamType) NewOutput() *dag.StreamVar {
output := t.Graph().NewStreamVar()
t.OutputStreams().SetupNew(t, output)
return output
}

func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *CloneStreamType) GenerateOp() (exec.Op, error) {
return &CloneStream{
Raw: op.InputStreams[0].Var,
Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Raw: t.InputStreams().Get(0).Var,
Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
}, nil
}

func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar {
return dag.NodeNewOutputStream(node, nil)
// func (t *CloneStreamType) String() string {
// return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

type CloneVarType struct {
dag.NodeBase
}

func (t *CloneStreamType) String(node *dag.Node) string {
return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType {
node := &CloneVarType{}
b.AddNode(node)
return node
}

type CloneVarType struct{}
func (t *CloneVarType) SetInput(raw *dag.ValueVar) {
t.InputValues().EnsureSize(1)
raw.Connect(t, 0)
}

func (t *CloneVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
func (t *CloneVarType) NewOutput() *dag.ValueVar {
output := t.Graph().NewValueVar(t.InputValues().Get(0).Type)
t.OutputValues().SetupNew(t, output)
return output
}

func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *CloneVarType) GenerateOp() (exec.Op, error) {
return &CloneVar{
Raw: op.InputValues[0].Var,
Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var {
Raw: t.InputValues().Get(0).Var,
Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var {
return v.Var
}),
}, nil
}

func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar {
return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil)
}

func (t *CloneVarType) String(node *dag.Node) string {
return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *CloneVarType) String() string {
// return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 36
- 20
common/pkgs/ioswitch2/ops2/ec.go View File

@@ -14,7 +14,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"golang.org/x/sync/semaphore"
)

@@ -204,15 +203,43 @@ func (o *ECMultiply) String() string {
)
}

type MultiplyType struct {
type ECMultiplyNode struct {
dag.NodeBase
EC cdssdk.ECRedundancy
InputIndexes []int
OutputIndexes []int
}

func (t *MultiplyType) InitNode(node *dag.Node) {}
func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode {
node := &ECMultiplyNode{
EC: ec,
}
b.AddNode(node)
return node
}

func (t *ECMultiplyNode) AddInput(str *dag.StreamVar, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
}

func (t *ECMultiplyNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
}
t.InputStreams().Resize(0)
t.InputIndexes = nil
}

func (t *ECMultiplyNode) NewOutput(dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
output := t.Graph().NewStreamVar()
t.OutputStreams().SetupNew(t, output)
return output
}

func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *ECMultiplyNode) GenerateOp() (exec.Op, error) {
rs, err := ec.NewRs(t.EC.K, t.EC.N)
if err != nil {
return nil, err
@@ -224,23 +251,12 @@ func (t *MultiplyType) GenerateOp(op *dag.Node) (exec.Op, error) {

return &ECMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.EC.ChunkSize,
}, nil
}

func (t *MultiplyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
node.InputStreams = append(node.InputStreams, str)
str.To(node, len(node.InputStreams)-1)
}

func (t *MultiplyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
return dag.NodeNewOutputStream(node, &ioswitch2.VarProps{StreamIndex: dataIndex})
}

func (t *MultiplyType) String(node *dag.Node) string {
return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *MultiplyType) String() string {
// return fmt.Sprintf("Multiply[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 43
- 14
common/pkgs/ioswitch2/ops2/file.go View File

@@ -11,7 +11,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
)

func init() {
@@ -80,36 +79,66 @@ func (o *FileRead) String() string {
return fmt.Sprintf("FileRead %s -> %v", o.FilePath, o.Output.ID)
}

type FileReadType struct {
type FileReadNode struct {
dag.NodeBase
FilePath string
}

func (t *FileReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
func (b *GraphNodeBuilder) NewFileRead(filePath string) *FileReadNode {
node := &FileReadNode{
FilePath: filePath,
}
b.AddNode(node)
node.OutputStreams().SetupNew(node, b.NewStreamVar())
return node
}

func (t *FileReadNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
Index: 0,
}
}

func (t *FileReadType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *FileReadNode) GenerateOp() (exec.Op, error) {
return &FileRead{
Output: op.OutputStreams[0].Var,
Output: t.OutputStreams().Get(0).Var,
FilePath: t.FilePath,
}, nil
}

func (t *FileReadType) String(node *dag.Node) string {
return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node))
}
// func (t *FileReadType) String() string {
// return fmt.Sprintf("FileRead[%s]%v%v", t.FilePath, formatStreamIO(node), formatValueIO(node))
// }

type FileWriteType struct {
type FileWriteNode struct {
dag.NodeBase
FilePath string
}

func (t *FileWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
func (b *GraphNodeBuilder) NewFileWrite(filePath string) *FileWriteNode {
node := &FileWriteNode{
FilePath: filePath,
}
b.AddNode(node)
return node
}

func (t *FileWriteNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
Index: 0,
}
}

func (t *FileWriteNode) SetInput(str *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
str.Connect(t, 0)
}

func (t *FileWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *FileWriteNode) GenerateOp() (exec.Op, error) {
return &FileWrite{
Input: op.InputStreams[0].Var,
Input: t.InputStreams().Get(0).Var,
FilePath: t.FilePath,
}, nil
}

+ 53
- 20
common/pkgs/ioswitch2/ops2/ipfs.go View File

@@ -12,7 +12,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
)

func init() {
@@ -94,44 +93,78 @@ func (o *IPFSWrite) String() string {
return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID)
}

type IPFSReadType struct {
type IPFSReadNode struct {
dag.NodeBase
FileHash string
Option ipfs.ReadOption
}

func (t *IPFSReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode {
node := &IPFSReadNode{
FileHash: fileHash,
Option: option,
}
b.AddNode(node)
node.OutputStreams().SetupNew(node, b.NewStreamVar())
return node
}

func (t *IPFSReadNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
Index: 0,
}
}

func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) {
func (t *IPFSReadNode) GenerateOp() (exec.Op, error) {
return &IPFSRead{
Output: n.OutputStreams[0].Var,
Output: t.OutputStreams().Get(0).Var,
FileHash: t.FileHash,
Option: t.Option,
}, nil
}

func (t *IPFSReadType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
}
// func (t *IPFSReadType) String() string {
// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
// }

type IPFSWriteType struct {
type IPFSWriteNode struct {
dag.NodeBase
FileHashStoreKey string
Range exec.Range
}

func (t *IPFSWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitch2.VarProps{})
func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode {
node := &IPFSWriteNode{
FileHashStoreKey: fileHashStoreKey,
}
b.AddNode(node)
return node
}

func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar))
}

func (t *IPFSWriteNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
Index: 0,
}
}

func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar {
return t.OutputValues().Get(0)
}

func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) {
return &IPFSWrite{
Input: op.InputStreams[0].Var,
FileHash: op.OutputValues[0].Var.(*exec.StringVar),
Input: t.InputStreams().Get(0).Var,
FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar),
}, nil
}

func (t *IPFSWriteType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}
// func (t *IPFSWriteType) String() string {
// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
// }

+ 84
- 66
common/pkgs/ioswitch2/ops2/ops.go View File

@@ -1,75 +1,93 @@
package ops2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
)

func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
type GraphNodeBuilder struct {
*ops.GraphNodeBuilder
}

func NewGraphNodeBuilder() *GraphNodeBuilder {
return &GraphNodeBuilder{ops.NewGraphNodeBuilder()}
}

type FromNode interface {
dag.Node
Output() dag.StreamSlot
}

func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
type ToNode interface {
dag.Node
Input() dag.StreamSlot
SetInput(input *dag.StreamVar)
}

// func formatStreamIO(node *dag.Node) string {
// is := ""
// for i, in := range node.InputStreams {
// if i > 0 {
// is += ","
// }

// if in == nil {
// is += "."
// } else {
// is += fmt.Sprintf("%v", in.ID)
// }
// }

// os := ""
// for i, out := range node.OutputStreams {
// if i > 0
// os += ","
// }

// if out == nil {
// os += "."
// } else {
// os += fmt.Sprintf("%v", out.ID)
// }
// }

// if is == "" && os == "" {
// return ""
// }

// return fmt.Sprintf("S{%s>%s}", is, os)
// }

// func formatValueIO(node *dag.Node) string {
// is := ""
// for i, in := range node.InputValues {
// if i > 0 {
// is += ","
// }

// if in == nil {
// is += "."
// } else {
// is += fmt.Sprintf("%v", in.ID)
// }
// }

// os := ""
// for i, out := range node.OutputValues {
// if i > 0 {
// os += ","
// }

// if out == nil {
// os += "."
// } else {
// os += fmt.Sprintf("%v", out.ID)
// }
// }

// if is == "" && os == "" {
// return ""
// }

// return fmt.Sprintf("V{%s>%s}", is, os)
// }

+ 21
- 11
common/pkgs/ioswitch2/ops2/range.go View File

@@ -10,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
)

func init() {
@@ -76,24 +75,35 @@ func (o *Range) String() string {
return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID)
}

type RangeType struct {
type RangeNode struct {
dag.NodeBase
Range exec.Range
}

func (t *RangeType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, &ioswitch2.VarProps{})
func (b *GraphNodeBuilder) NewRange() *RangeNode {
node := &RangeNode{}
b.AddNode(node)
return node
}

func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) {
func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
t.Range = rng
output := t.Graph().NewStreamVar()
t.OutputStreams().Setup(t, output, 0)
return output
}

func (t *RangeNode) GenerateOp() (exec.Op, error) {
return &Range{
Input: n.InputStreams[0].Var,
Output: n.OutputStreams[0].Var,
Input: t.InputStreams().Get(0).Var,
Output: t.OutputStreams().Get(0).Var,
Offset: t.Range.Offset,
Length: t.Range.Length,
}, nil
}

func (t *RangeType) String(node *dag.Node) string {
return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}
// func (t *RangeType) String() string {
// return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
// }

+ 178
- 190
common/pkgs/ioswitch2/parser/parser.go View File

@@ -7,7 +7,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
@@ -26,16 +25,27 @@ func NewParser(ec cdssdk.ECRedundancy) *DefaultParser {
}
}

type IndexedStream struct {
Stream *dag.StreamVar
DataIndex int
}

type ParseContext struct {
Ft ioswitch2.FromTo
DAG *dag.Graph
DAG *ops2.GraphNodeBuilder
// 为了产生所有To所需的数据范围,而需要From打开的范围。
// 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。
StreamRange exec.Range
ToNodes map[ioswitch2.To]ops2.ToNode
IndexedStreams []IndexedStream
StreamRange exec.Range
}

func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
ctx := ParseContext{Ft: ft, DAG: dag.NewGraph()}
ctx := ParseContext{
Ft: ft,
DAG: ops2.NewGraphNodeBuilder(),
ToNodes: make(map[ioswitch2.To]ops2.ToNode),
}

// 分成两个阶段:
// 1. 基于From和To生成更多指令,初步匹配to的需求
@@ -43,7 +53,7 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro
// 计算一下打开流的范围
p.calcStreamRange(&ctx)

err := p.extend(&ctx, ft)
err := p.extend(&ctx)
if err != nil {
return err
}
@@ -82,20 +92,16 @@ func (p *DefaultParser) Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) erro
p.generateClone(&ctx)
p.generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
return plan.Generate(ctx.DAG.Graph, blder)
}
func (p *DefaultParser) findOutputStream(ctx *ParseContext, streamIndex int) *dag.StreamVar {
var ret *dag.StreamVar
ctx.DAG.Walk(func(n *dag.Node) bool {
for _, o := range n.OutputStreams {
if o != nil && ioswitch2.SProps(o).StreamIndex == streamIndex {
ret = o
return false
}
for _, s := range ctx.IndexedStreams {
if s.DataIndex == streamIndex {
ret = s.Stream
break
}
return true
})

}
return ret
}

@@ -134,77 +140,86 @@ func (p *DefaultParser) calcStreamRange(ctx *ParseContext) {
ctx.StreamRange = rng
}

func (p *DefaultParser) extend(ctx *ParseContext, ft ioswitch2.FromTo) error {
for _, fr := range ft.Froms {
frNode, err := p.buildFromNode(ctx, &ft, fr)
func (p *DefaultParser) extend(ctx *ParseContext) error {
for _, fr := range ctx.Ft.Froms {
frNode, err := p.buildFromNode(ctx, fr)
if err != nil {
return err
}

ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: frNode.Output().Var,
DataIndex: fr.GetDataIndex(),
})

// 对于完整文件的From,生成Split指令
if fr.GetDataIndex() == -1 {
node, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedSplitType{ChunkSize: p.EC.ChunkSize, OutputCount: p.EC.K}, &ioswitch2.NodeProps{})
frNode.OutputStreams[0].To(node, 0)
splitNode := ctx.DAG.NewChunkedSplit(p.EC.ChunkSize)
splitNode.Split(frNode.Output().Var, p.EC.K)
for i := 0; i < p.EC.K; i++ {
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: splitNode.SubStream(i),
DataIndex: i,
})
}
}
}

// 如果有K个不同的文件块流,则生成Multiply指令,同时针对其生成的流,生成Join指令
ecInputStrs := make(map[int]*dag.StreamVar)
loop:
for _, o := range ctx.DAG.Nodes {
for _, s := range o.OutputStreams {
prop := ioswitch2.SProps(s)
if prop.StreamIndex >= 0 && ecInputStrs[prop.StreamIndex] == nil {
ecInputStrs[prop.StreamIndex] = s
if len(ecInputStrs) == p.EC.K {
break loop
}
for _, s := range ctx.IndexedStreams {
if s.DataIndex >= 0 && ecInputStrs[s.DataIndex] == nil {
ecInputStrs[s.DataIndex] = s.Stream
if len(ecInputStrs) == p.EC.K {
break
}
}
}

if len(ecInputStrs) == p.EC.K {
mulNode, mulType := dag.NewNode(ctx.DAG, &ops2.MultiplyType{
EC: p.EC,
}, &ioswitch2.NodeProps{})
mulNode := ctx.DAG.NewECMultiply(p.EC)

for _, s := range ecInputStrs {
mulType.AddInput(mulNode, s, ioswitch2.SProps(s).StreamIndex)
for i, s := range ecInputStrs {
mulNode.AddInput(s, i)
}
for i := 0; i < p.EC.N; i++ {
mulType.NewOutput(mulNode, i)
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: mulNode.NewOutput(i),
DataIndex: i,
})
}

joinNode, _ := dag.NewNode(ctx.DAG, &ops2.ChunkedJoinType{
InputCount: p.EC.K,
ChunkSize: p.EC.ChunkSize,
}, &ioswitch2.NodeProps{})

joinNode := ctx.DAG.NewChunkedJoin(p.EC.ChunkSize)
for i := 0; i < p.EC.K; i++ {
// 不可能找不到流
p.findOutputStream(ctx, i).To(joinNode, i)
joinNode.AddInput(p.findOutputStream(ctx, i))
}
ioswitch2.SProps(joinNode.OutputStreams[0]).StreamIndex = -1
ctx.IndexedStreams = append(ctx.IndexedStreams, IndexedStream{
Stream: joinNode.Joined(),
DataIndex: -1,
})
}

// 为每一个To找到一个输入流
for _, to := range ft.Toes {
n, err := p.buildToNode(ctx, &ft, to)
for _, to := range ctx.Ft.Toes {
toNode, err := p.buildToNode(ctx, to)
if err != nil {
return err
}
ctx.ToNodes[to] = toNode

str := p.findOutputStream(ctx, to.GetDataIndex())
if str == nil {
return fmt.Errorf("no output stream found for data index %d", to.GetDataIndex())
}

str.To(n, 0)
toNode.SetInput(str)
}

return nil
}

func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f ioswitch2.From) (*dag.Node, error) {
func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2.FromNode, error) {
var repRange exec.Range
var blkRange exec.Range

@@ -220,16 +235,10 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f

switch f := f.(type) {
case *ioswitch2.FromNode:
n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{
FileHash: f.FileHash,
Option: ipfs.ReadOption{
Offset: 0,
Length: -1,
},
}, &ioswitch2.NodeProps{
From: f,
t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{
Offset: 0,
Length: -1,
})
ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
t.Option.Offset = repRange.Offset
@@ -244,17 +253,16 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f
}

if f.Node != nil {
n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node})
n.Env.Pinned = true
t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node})
t.Env().Pinned = true
}

return n, nil
return t, nil

case *ioswitch2.FromDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitch2.NodeProps{From: f})
n.Env.ToEnvDriver()
n.Env.Pinned = true
ioswitch2.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex
n := ctx.DAG.NewFromDriver(f.Handle)
n.Env().ToEnvDriver()
n.Env().Pinned = true

if f.DataIndex == -1 {
f.Handle.RangeHint.Offset = repRange.Offset
@@ -271,24 +279,19 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *ioswitch2.FromTo, f
}
}

func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t ioswitch2.To) (*dag.Node, error) {
func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) {
switch t := t.(type) {
case *ioswitch2.ToNode:
n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{
FileHashStoreKey: t.FileHashStoreKey,
Range: t.Range,
}, &ioswitch2.NodeProps{
To: t,
})
n.Env.ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node})
n.Env.Pinned = true
n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey)
n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node})
n.Env().Pinned = true

return n, nil

case *ioswitch2.ToDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitch2.NodeProps{To: t})
n.Env.ToEnvDriver()
n.Env.Pinned = true
n := ctx.DAG.NewToDriver(t.Handle)
n.Env().ToEnvDriver()
n.Env().Pinned = true

return n, nil

@@ -301,15 +304,12 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, ft *ioswitch2.FromTo, t i
func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops2.ChunkedJoinType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedJoinType) bool {
if len(node.OutputStreams[0].Toes) > 0 {
dag.WalkOnlyType[*ops2.ChunkedJoinNode](ctx.DAG.Graph, func(node *ops2.ChunkedJoinNode) bool {
if node.InputStreams().Len() > 0 {
return true
}

for _, in := range node.InputStreams {
in.NotTo(node)
}

node.RemoveAllInputs()
ctx.DAG.RemoveNode(node)
return true
})
@@ -320,25 +320,23 @@ func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
// 减少未使用的Multiply指令的输出流。如果减少到0,则删除该指令
func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
changed := false
dag.WalkOnlyType[*ops2.MultiplyType](ctx.DAG, func(node *dag.Node, typ *ops2.MultiplyType) bool {
for i2, out := range node.OutputStreams {
if len(out.Toes) > 0 {
dag.WalkOnlyType[*ops2.ECMultiplyNode](ctx.DAG.Graph, func(node *ops2.ECMultiplyNode) bool {
outArr := node.OutputStreams().RawArray()
for i2, out := range outArr {
if out.To().Len() > 0 {
continue
}

node.OutputStreams[i2] = nil
typ.OutputIndexes[i2] = -2
outArr[i2] = nil
node.OutputIndexes[i2] = -2
changed = true
}
node.OutputStreams = lo2.RemoveAllDefault(node.OutputStreams)
typ.OutputIndexes = lo2.RemoveAll(typ.OutputIndexes, -2)
node.OutputStreams().SetRawArray(lo2.RemoveAllDefault(outArr))
node.OutputIndexes = lo2.RemoveAll(node.OutputIndexes, -2)

// 如果所有输出流都被删除,则删除该指令
if len(node.OutputStreams) == 0 {
for _, in := range node.InputStreams {
in.NotTo(node)
}

if node.OutputStreams().Len() == 0 {
node.RemoveAllInputs()
ctx.DAG.RemoveNode(node)
changed = true
}
@@ -351,16 +349,16 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool {
// 删除未使用的Split指令
func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
changed := false
dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(node *dag.Node, typ *ops2.ChunkedSplitType) bool {
dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(typ *ops2.ChunkedSplitNode) bool {
// Split出来的每一个流都没有被使用,才能删除这个指令
for _, out := range node.OutputStreams {
if len(out.Toes) > 0 {
for _, out := range typ.OutputStreams().RawArray() {
if out.To().Len() > 0 {
return true
}
}

node.InputStreams[0].NotTo(node)
ctx.DAG.RemoveNode(node)
typ.Clear()
ctx.DAG.RemoveNode(typ)
changed = true
return true
})
@@ -372,43 +370,44 @@ func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool {
func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
changed := false

dag.WalkOnlyType[*ops2.ChunkedSplitType](ctx.DAG, func(splitNode *dag.Node, typ *ops2.ChunkedSplitType) bool {
dag.WalkOnlyType[*ops2.ChunkedSplitNode](ctx.DAG.Graph, func(splitNode *ops2.ChunkedSplitNode) bool {
// Split指令的每一个输出都有且只有一个目的地
var joinNode *dag.Node
for _, out := range splitNode.OutputStreams {
if len(out.Toes) != 1 {
continue
var dstNode dag.Node
for _, out := range splitNode.OutputStreams().RawArray() {
if out.To().Len() != 1 {
return true
}

if joinNode == nil {
joinNode = out.Toes[0].Node
} else if joinNode != out.Toes[0].Node {
if dstNode == nil {
dstNode = out.To().Get(0).Node
} else if dstNode != out.To().Get(0).Node {
return true
}
}

if joinNode == nil {
if dstNode == nil {
return true
}

// 且这个目的地要是一个Join指令
_, ok := joinNode.Type.(*ops2.ChunkedJoinType)
joinNode, ok := dstNode.(*ops2.ChunkedJoinNode)
if !ok {
return true
}

// 同时这个Join指令的输入也必须全部来自Split指令的输出。
// 由于上面判断了Split指令的输出目的地都相同,所以这里只要判断Join指令的输入数量是否与Split指令的输出数量相同即可
if len(joinNode.InputStreams) != len(splitNode.OutputStreams) {
if joinNode.InputStreams().Len() != splitNode.OutputStreams().Len() {
return true
}

// 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流:
// F->Split->Join->T 变换为:F->T
splitNode.InputStreams[0].NotTo(splitNode)
for _, out := range joinNode.OutputStreams[0].Toes {
splitNode.InputStreams[0].To(out.Node, out.SlotIndex)
splitInput := splitNode.InputStreams().Get(0)
for _, to := range joinNode.Joined().To().RawArray() {
splitInput.Connect(to.Node, to.SlotIndex)
}
splitInput.Disconnect(splitNode, 0)

// 并删除这两个指令
ctx.DAG.RemoveNode(joinNode)
@@ -426,21 +425,21 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
// 所以理论上不会出现有指令的位置始终无法确定的情况。
func (p *DefaultParser) pin(ctx *ParseContext) bool {
changed := false
ctx.DAG.Walk(func(node *dag.Node) bool {
if node.Env.Pinned {
ctx.DAG.Walk(func(node dag.Node) bool {
if node.Env().Pinned {
return true
}

var toEnv *dag.NodeEnv
for _, out := range node.OutputStreams {
for _, to := range out.Toes {
if to.Node.Env.Type == dag.EnvUnknown {
for _, out := range node.OutputStreams().RawArray() {
for _, to := range out.To().RawArray() {
if to.Node.Env().Type == dag.EnvUnknown {
continue
}

if toEnv == nil {
toEnv = &to.Node.Env
} else if !toEnv.Equals(to.Node.Env) {
toEnv = to.Node.Env()
} else if !toEnv.Equals(to.Node.Env()) {
toEnv = nil
break
}
@@ -448,35 +447,35 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool {
}

if toEnv != nil {
if !node.Env.Equals(*toEnv) {
if !node.Env().Equals(toEnv) {
changed = true
}

node.Env = *toEnv
*node.Env() = *toEnv
return true
}

// 否则根据输入流的始发地来固定
var fromEnv *dag.NodeEnv
for _, in := range node.InputStreams {
if in.From.Node.Env.Type == dag.EnvUnknown {
for _, in := range node.InputStreams().RawArray() {
if in.From().Node.Env().Type == dag.EnvUnknown {
continue
}

if fromEnv == nil {
fromEnv = &in.From.Node.Env
} else if !fromEnv.Equals(in.From.Node.Env) {
fromEnv = in.From().Node.Env()
} else if !fromEnv.Equals(in.From().Node.Env()) {
fromEnv = nil
break
}
}

if fromEnv != nil {
if !node.Env.Equals(*fromEnv) {
if !node.Env().Equals(fromEnv) {
changed = true
}

node.Env = *fromEnv
*node.Env() = *fromEnv
}
return true
})
@@ -486,12 +485,12 @@ func (p *DefaultParser) pin(ctx *ParseContext) bool {

// 对于所有未使用的流,增加Drop指令
func (p *DefaultParser) dropUnused(ctx *ParseContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) == 0 {
n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitch2.NodeProps{})
n.Env = node.Env
out.To(n, 0)
ctx.DAG.Walk(func(node dag.Node) bool {
for _, out := range node.OutputStreams().RawArray() {
if out.To().Len() == 0 {
n := ctx.DAG.NewDropStream()
*n.Env() = *node.Env()
n.SetInput(out)
}
}
return true
@@ -500,43 +499,38 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) {

// 为IPFS写入指令存储结果
func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) {
dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool {
if typ.FileHashStoreKey == "" {
dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool {
if n.FileHashStoreKey == "" {
return true
}

n, t := dag.NewNode(ctx.DAG, &ops.StoreType{
StoreKey: typ.FileHashStoreKey,
}, &ioswitch2.NodeProps{})
n.Env.ToEnvDriver()
t.Store(n, node.OutputValues[0])
storeNode := ctx.DAG.NewStore()
storeNode.Env().ToEnvDriver()

storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
return true
})
}

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
func (p *DefaultParser) generateRange(ctx *ParseContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
props := ioswitch2.NProps(node)
if props.To == nil {
return true
}
for i := 0; i < len(ctx.Ft.Toes); i++ {
to := ctx.Ft.Toes[i]
toNode := ctx.ToNodes[to]

toDataIdx := props.To.GetDataIndex()
toRng := props.To.GetRange()
toDataIdx := to.GetDataIndex()
toRng := to.GetRange()

if toDataIdx == -1 {
n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
},
}, &ioswitch2.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)
n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.From().Node.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toNode.SetInput(rnged)

} else {
stripSize := int64(p.EC.ChunkSize * p.EC.K)
@@ -544,54 +538,48 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) {

blkStart := blkStartIdx * int64(p.EC.ChunkSize)

n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
},
}, &ioswitch2.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)
n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.From().Node.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toNode.SetInput(rnged)
}

return true
})
}
}

// 生成Clone指令
func (p *DefaultParser) generateClone(ctx *ParseContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) <= 1 {
ctx.DAG.Walk(func(node dag.Node) bool {
for _, out := range node.OutputStreams().RawArray() {
if out.To().Len() <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitch2.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
str := t.NewOutput(n)
str.Props = &ioswitch2.VarProps{StreamIndex: ioswitch2.SProps(out).StreamIndex}
str.To(to.Node, to.SlotIndex)
t := ctx.DAG.NewCloneStream()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
out.To().Resize(0)
t.SetInput(out)
}

for _, out := range node.OutputValues {
if len(out.Toes) <= 1 {
for _, out := range node.OutputValues().RawArray() {
if out.To().Len() <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitch2.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(n).To(to.Node, to.SlotIndex)
t := ctx.DAG.NewCloneValue()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
out.To().Resize(0)
t.SetInput(out)
}

return true


+ 0
- 17
common/pkgs/ioswitch2/utils.go View File

@@ -1,17 +0,0 @@
package ioswitch2

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func NProps(n *dag.Node) *NodeProps {
return dag.NProps[*NodeProps](n)
}

func SProps(str *dag.StreamVar) *VarProps {
return dag.SProps[*VarProps](str)
}

func VProps(v *dag.ValueVar) *VarProps {
return dag.VProps[*VarProps](v)
}

+ 3
- 5
common/pkgs/ioswitchlrc/agent_worker.go View File

@@ -5,16 +5,14 @@ import (
"io"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
)

var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo](
(*AgentWorker)(nil),
)))
// var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[exec.WorkerInfo](
// (*AgentWorker)(nil),
// )))

type AgentWorker struct {
Node cdssdk.Node


+ 0
- 23
common/pkgs/ioswitchlrc/ioswitch.go View File

@@ -1,23 +0,0 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
)

type NodeProps struct {
From From
To To
}

type ValueVarType int

const (
StringValueVar ValueVarType = iota
SignalValueVar
)

type VarProps struct {
StreamIndex int // 流的编号,只在StreamVar上有意义
ValueType ValueVarType // 值类型,只在ValueVar上有意义
Var exec.Var // 生成Plan的时候创建的对应的Var
}

+ 55
- 29
common/pkgs/ioswitchlrc/ops2/chunked.go View File

@@ -11,7 +11,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/utils"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"golang.org/x/sync/semaphore"
)

@@ -101,24 +100,40 @@ func (o *ChunkedJoin) String() string {
)
}

type ChunkedSplitType struct {
OutputCount int
ChunkSize int
type ChunkedSplitNode struct {
dag.NodeBase
ChunkSize int
}

func (t *ChunkedSplitType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
for i := 0; i < t.OutputCount; i++ {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: i,
})
func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode {
node := &ChunkedSplitNode{
ChunkSize: chunkSize,
}
b.AddNode(node)
return node
}

func (t *ChunkedSplitNode) Split(input *dag.StreamVar, cnt int) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
t.OutputStreams().Resize(cnt)
for i := 0; i < cnt; i++ {
t.OutputStreams().Setup(t, t.Graph().NewStreamVar(), i)
}
}

func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *ChunkedSplitNode) SubStream(idx int) *dag.StreamVar {
return t.OutputStreams().Get(idx)
}

func (t *ChunkedSplitNode) SplitCount() int {
return t.OutputStreams().Len()
}

func (t *ChunkedSplitNode) GenerateOp() (exec.Op, error) {
return &ChunkedSplit{
Input: op.InputStreams[0].Var,
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Input: t.InputStreams().Get(0).Var,
Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
ChunkSize: t.ChunkSize,
@@ -126,32 +141,43 @@ func (t *ChunkedSplitType) GenerateOp(op *dag.Node) (exec.Op, error) {
}, nil
}

func (t *ChunkedSplitType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
// func (t *ChunkedSplitNode) String() string {
// return fmt.Sprintf("ChunkedSplit[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
// }

type ChunkedJoinNode struct {
dag.NodeBase
ChunkSize int
}

type ChunkedJoinType struct {
InputCount int
ChunkSize int
func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode {
node := &ChunkedJoinNode{
ChunkSize: chunkSize,
}
b.AddNode(node)
node.OutputStreams().SetupNew(node, b.Graph.NewStreamVar())
return node
}

func (t *ChunkedJoinType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, t.InputCount)
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: -1,
})
func (t *ChunkedJoinNode) AddInput(str *dag.StreamVar) {
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
}

func (t *ChunkedJoinType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *ChunkedJoinNode) Joined() *dag.StreamVar {
return t.OutputStreams().Get(0)
}

func (t *ChunkedJoinNode) GenerateOp() (exec.Op, error) {
return &ChunkedJoin{
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
Output: op.OutputStreams[0].Var,
Output: t.OutputStreams().Get(0).Var,
ChunkSize: t.ChunkSize,
}, nil
}

func (t *ChunkedJoinType) String(node *dag.Node) string {
return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
}
// func (t *ChunkedJoinType) String() string {
// return fmt.Sprintf("ChunkedJoin[%v]%v%v", t.ChunkSize, formatStreamIO(node), formatValueIO(node))
// }

+ 45
- 23
common/pkgs/ioswitchlrc/ops2/clone.go View File

@@ -74,48 +74,70 @@ func (o *CloneVar) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw.GetID(), utils.FormatVarIDs(o.Cloneds))
}

type CloneStreamType struct{}
type CloneStreamType struct {
dag.NodeBase
}

func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType {
node := &CloneStreamType{}
b.AddNode(node)
return node
}

func (t *CloneStreamType) SetInput(raw *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
raw.Connect(t, 0)
}

func (t *CloneStreamType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
func (t *CloneStreamType) NewOutput() *dag.StreamVar {
output := t.Graph().NewStreamVar()
t.OutputStreams().SetupNew(t, output)
return output
}

func (t *CloneStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *CloneStreamType) GenerateOp() (exec.Op, error) {
return &CloneStream{
Raw: op.InputStreams[0].Var,
Cloneds: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar {
Raw: t.InputStreams().Get(0).Var,
Cloneds: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar {
return v.Var
}),
}, nil
}

func (t *CloneStreamType) NewOutput(node *dag.Node) *dag.StreamVar {
return dag.NodeNewOutputStream(node, nil)
// func (t *CloneStreamType) String() string {
// return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

type CloneVarType struct {
dag.NodeBase
}

func (t *CloneStreamType) String(node *dag.Node) string {
return fmt.Sprintf("CloneStream[]%v%v", formatStreamIO(node), formatValueIO(node))
func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType {
node := &CloneVarType{}
b.AddNode(node)
return node
}

type CloneVarType struct{}
func (t *CloneVarType) SetInput(raw *dag.ValueVar) {
t.InputValues().EnsureSize(1)
raw.Connect(t, 0)
}

func (t *CloneVarType) InitNode(node *dag.Node) {
dag.NodeDeclareInputValue(node, 1)
func (t *CloneVarType) NewOutput() *dag.ValueVar {
output := t.Graph().NewValueVar(t.InputValues().Get(0).Type)
t.OutputValues().SetupNew(t, output)
return output
}

func (t *CloneVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *CloneVarType) GenerateOp() (exec.Op, error) {
return &CloneVar{
Raw: op.InputValues[0].Var,
Cloneds: lo.Map(op.OutputValues, func(v *dag.ValueVar, idx int) exec.Var {
Raw: t.InputValues().Get(0).Var,
Cloneds: lo.Map(t.OutputValues().RawArray(), func(v *dag.ValueVar, idx int) exec.Var {
return v.Var
}),
}, nil
}

func (t *CloneVarType) NewOutput(node *dag.Node) *dag.ValueVar {
return dag.NodeNewOutputValue(node, node.InputValues[0].Type, nil)
}

func (t *CloneVarType) String(node *dag.Node) string {
return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *CloneVarType) String() string {
// return fmt.Sprintf("CloneVar[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 62
- 40
common/pkgs/ioswitchlrc/ops2/ec.go View File

@@ -15,7 +15,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec/lrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
@@ -115,15 +114,43 @@ func (o *GalMultiply) String() string {
)
}

type LRCConstructAnyType struct {
type LRCConstructAnyNode struct {
dag.NodeBase
LRC cdssdk.LRCRedundancy
InputIndexes []int
OutputIndexes []int
}

func (t *LRCConstructAnyType) InitNode(node *dag.Node) {}
func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCConstructAnyNode {
node := &LRCConstructAnyNode{
LRC: lrc,
}
b.AddNode(node)
return node
}

func (t *LRCConstructAnyNode) AddInput(str *dag.StreamVar, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
}

func (t *LRCConstructAnyNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
}
t.InputStreams().Resize(0)
t.InputIndexes = nil
}

func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *LRCConstructAnyNode) NewOutput(dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
output := t.Graph().NewStreamVar()
t.OutputStreams().SetupNew(t, output)
return output
}

func (t *LRCConstructAnyNode) GenerateOp() (exec.Op, error) {
l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups)
if err != nil {
return nil, err
@@ -135,50 +162,45 @@ func (t *LRCConstructAnyType) GenerateOp(op *dag.Node) (exec.Op, error) {

return &GalMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.LRC.ChunkSize,
}, nil
}

func (t *LRCConstructAnyType) AddInput(node *dag.Node, str *dag.StreamVar, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
node.InputStreams = append(node.InputStreams, str)
str.To(node, len(node.InputStreams)-1)
}
// func (t *LRCConstructAnyType) String() string {
// return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

func (t *LRCConstructAnyType) RemoveAllInputs(n *dag.Node) {
for _, in := range n.InputStreams {
in.From.Node.OutputStreams[in.From.SlotIndex].NotTo(n)
}
n.InputStreams = nil
t.InputIndexes = nil
}

func (t *LRCConstructAnyType) NewOutput(node *dag.Node, dataIndex int) *dag.StreamVar {
t.OutputIndexes = append(t.OutputIndexes, dataIndex)
return dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{StreamIndex: dataIndex})
type LRCConstructGroupNode struct {
dag.NodeBase
LRC cdssdk.LRCRedundancy
TargetBlockIndex int
}

func (t *LRCConstructAnyType) String(node *dag.Node) string {
return fmt.Sprintf("LRCAny[]%v%v", formatStreamIO(node), formatValueIO(node))
func (b *GraphNodeBuilder) NewLRCConstructGroup(lrc cdssdk.LRCRedundancy) *LRCConstructGroupNode {
node := &LRCConstructGroupNode{
LRC: lrc,
}
b.AddNode(node)
return node
}

type LRCConstructGroupType struct {
LRC cdssdk.LRCRedundancy
TargetBlockIndex int
}
func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.StreamVar) *dag.StreamVar {
t.TargetBlockIndex = blockIdx

func (t *LRCConstructGroupType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{
StreamIndex: t.TargetBlockIndex,
})
t.InputStreams().Resize(0)
for _, in := range inputs {
idx := t.InputStreams().EnlargeOne()
in.Connect(t, idx)
}

grpIdx := t.LRC.FindGroup(t.TargetBlockIndex)
dag.NodeDeclareInputStream(node, t.LRC.Groups[grpIdx])
output := t.Graph().NewStreamVar()
t.OutputStreams().Setup(t, output, 0)
return output
}

func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *LRCConstructGroupNode) GenerateOp() (exec.Op, error) {
l, err := lrc.New(t.LRC.N, t.LRC.K, t.LRC.Groups)
if err != nil {
return nil, err
@@ -190,12 +212,12 @@ func (t *LRCConstructGroupType) GenerateOp(op *dag.Node) (exec.Op, error) {

return &GalMultiply{
Coef: coef,
Inputs: lo.Map(op.InputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(op.OutputStreams, func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Inputs: lo.Map(t.InputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
Outputs: lo.Map(t.OutputStreams().RawArray(), func(v *dag.StreamVar, idx int) *exec.StreamVar { return v.Var }),
ChunkSize: t.LRC.ChunkSize,
}, nil
}

func (t *LRCConstructGroupType) String(node *dag.Node) string {
return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node))
}
// func (t *LRCConstructGroupType) String() string {
// return fmt.Sprintf("LRCGroup[]%v%v", formatStreamIO(node), formatValueIO(node))
// }

+ 53
- 20
common/pkgs/ioswitchlrc/ops2/ipfs.go View File

@@ -12,7 +12,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
@@ -94,44 +93,78 @@ func (o *IPFSWrite) String() string {
return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID)
}

type IPFSReadType struct {
type IPFSReadNode struct {
dag.NodeBase
FileHash string
Option ipfs.ReadOption
}

func (t *IPFSReadType) InitNode(node *dag.Node) {
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{})
func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode {
node := &IPFSReadNode{
FileHash: fileHash,
Option: option,
}
b.AddNode(node)
node.OutputStreams().SetupNew(node, b.NewStreamVar())
return node
}

func (t *IPFSReadNode) Output() dag.StreamSlot {
return dag.StreamSlot{
Var: t.OutputStreams().Get(0),
Index: 0,
}
}

func (t *IPFSReadType) GenerateOp(n *dag.Node) (exec.Op, error) {
func (t *IPFSReadNode) GenerateOp() (exec.Op, error) {
return &IPFSRead{
Output: n.OutputStreams[0].Var,
Output: t.OutputStreams().Get(0).Var,
FileHash: t.FileHash,
Option: t.Option,
}, nil
}

func (t *IPFSReadType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
}
// func (t *IPFSReadType) String() string {
// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node))
// }

type IPFSWriteType struct {
type IPFSWriteNode struct {
dag.NodeBase
FileHashStoreKey string
Range exec.Range
}

func (t *IPFSWriteType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputValue(node, dag.StringValueVar, &ioswitchlrc.VarProps{})
func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode {
node := &IPFSWriteNode{
FileHashStoreKey: fileHashStoreKey,
}
b.AddNode(node)
return node
}

func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar))
}

func (t *IPFSWriteNode) Input() dag.StreamSlot {
return dag.StreamSlot{
Var: t.InputStreams().Get(0),
Index: 0,
}
}

func (t *IPFSWriteType) GenerateOp(op *dag.Node) (exec.Op, error) {
func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar {
return t.OutputValues().Get(0)
}

func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) {
return &IPFSWrite{
Input: op.InputStreams[0].Var,
FileHash: op.OutputValues[0].Var.(*exec.StringVar),
Input: t.InputStreams().Get(0).Var,
FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar),
}, nil
}

func (t *IPFSWriteType) String(node *dag.Node) string {
return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}
// func (t *IPFSWriteType) String() string {
// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
// }

+ 84
- 66
common/pkgs/ioswitchlrc/ops2/ops.go View File

@@ -1,75 +1,93 @@
package ops2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
)

func formatStreamIO(node *dag.Node) string {
is := ""
for i, in := range node.InputStreams {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputStreams {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("S{%s>%s}", is, os)
type GraphNodeBuilder struct {
*ops.GraphNodeBuilder
}

func NewGraphNodeBuilder() *GraphNodeBuilder {
return &GraphNodeBuilder{ops.NewGraphNodeBuilder()}
}

type FromNode interface {
dag.Node
Output() dag.StreamSlot
}

func formatValueIO(node *dag.Node) string {
is := ""
for i, in := range node.InputValues {
if i > 0 {
is += ","
}

if in == nil {
is += "."
} else {
is += fmt.Sprintf("%v", in.ID)
}
}

os := ""
for i, out := range node.OutputValues {
if i > 0 {
os += ","
}

if out == nil {
os += "."
} else {
os += fmt.Sprintf("%v", out.ID)
}
}

if is == "" && os == "" {
return ""
}

return fmt.Sprintf("V{%s>%s}", is, os)
type ToNode interface {
dag.Node
Input() dag.StreamSlot
SetInput(input *dag.StreamVar)
}

// func formatStreamIO(node *dag.Node) string {
// is := ""
// for i, in := range node.InputStreams {
// if i > 0 {
// is += ","
// }

// if in == nil {
// is += "."
// } else {
// is += fmt.Sprintf("%v", in.ID)
// }
// }

// os := ""
// for i, out := range node.OutputStreams {
// if i > ops
// os += ","
// }

// if out == nil {
// os += "."
// } else {
// os += fmt.Sprintf("%v", out.ID)
// }
// }

// if is == "" && os == "" {
// return ""
// }

// return fmt.Sprintf("S{%s>%s}", is, os)
// }

// func formatValueIO(node *dag.Node) string {
// is := ""
// for i, in := range node.InputValues {
// if i > 0 {
// is += ","
// }

// if in == nil {
// is += "."
// } else {
// is += fmt.Sprintf("%v", in.ID)
// }
// }

// os := ""
// for i, out := range node.OutputValues {
// if i > 0 {
// os += ","
// }

// if out == nil {
// os += "."
// } else {
// os += fmt.Sprintf("%v", out.ID)
// }
// }

// if is == "" && os == "" {
// return ""
// }

// return fmt.Sprintf("V{%s>%s}", is, os)
// }

+ 21
- 11
common/pkgs/ioswitchlrc/ops2/range.go View File

@@ -10,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
)

func init() {
@@ -76,24 +75,35 @@ func (o *Range) String() string {
return fmt.Sprintf("Range(%v+%v) %v -> %v", o.Offset, o.Length, o.Input.ID, o.Output.ID)
}

type RangeType struct {
type RangeNode struct {
dag.NodeBase
Range exec.Range
}

func (t *RangeType) InitNode(node *dag.Node) {
dag.NodeDeclareInputStream(node, 1)
dag.NodeNewOutputStream(node, &ioswitchlrc.VarProps{})
func (b *GraphNodeBuilder) NewRange() *RangeNode {
node := &RangeNode{}
b.AddNode(node)
return node
}

func (t *RangeType) GenerateOp(n *dag.Node) (exec.Op, error) {
func (t *RangeNode) RangeStream(input *dag.StreamVar, rng exec.Range) *dag.StreamVar {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
t.Range = rng
output := t.Graph().NewStreamVar()
t.OutputStreams().Setup(t, output, 0)
return output
}

func (t *RangeNode) GenerateOp() (exec.Op, error) {
return &Range{
Input: n.InputStreams[0].Var,
Output: n.OutputStreams[0].Var,
Input: t.InputStreams().Get(0).Var,
Output: t.OutputStreams().Get(0).Var,
Offset: t.Range.Offset,
Length: t.Range.Length,
}, nil
}

func (t *RangeType) String(node *dag.Node) string {
return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
}
// func (t *RangeType) String() string {
// return fmt.Sprintf("Range[%v+%v]%v%v", t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node))
// }

+ 56
- 58
common/pkgs/ioswitchlrc/parser/generator.go View File

@@ -13,8 +13,9 @@ import (

type GenerateContext struct {
LRC cdssdk.LRCRedundancy
DAG *dag.Graph
Toes []ioswitchlrc.To
DAG *ops2.GraphNodeBuilder
To []ioswitchlrc.To
ToNodes map[ioswitchlrc.To]ops2.ToNode
StreamRange exec.Range
}

@@ -25,9 +26,10 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder)
}

ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
LRC: cdssdk.DefaultLRCRedundancy,
DAG: ops2.NewGraphNodeBuilder(),
To: toes,
ToNodes: make(map[ioswitchlrc.To]ops2.ToNode),
}

calcStreamRange(&ctx)
@@ -46,7 +48,7 @@ func Encode(fr ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder)
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
return plan.Generate(ctx.DAG.Graph, blder)
}

func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlrc.To) error {
@@ -66,9 +68,9 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

frNode.OutputStreams[0].To(toNode, 0)

toNode.SetInput(frNode.Output().Var)
} else if idx < ctx.LRC.K {
dataToes = append(dataToes, to)
} else {
@@ -81,19 +83,17 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr
}

// 需要文件块,则生成Split指令
splitNode := ctx.DAG.NewNode(&ops2.ChunkedSplitType{
OutputCount: ctx.LRC.K,
ChunkSize: ctx.LRC.ChunkSize,
}, &ioswitchlrc.NodeProps{})
frNode.OutputStreams[0].To(splitNode, 0)
splitNode := ctx.DAG.NewChunkedSplit(ctx.LRC.ChunkSize)
splitNode.Split(frNode.Output().Var, ctx.LRC.K)

for _, to := range dataToes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

splitNode.OutputStreams[to.GetDataIndex()].To(toNode, 0)
toNode.SetInput(splitNode.SubStream(to.GetDataIndex()))
}

if len(parityToes) == 0 {
@@ -102,12 +102,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr

// 需要校验块,则进一步生成Construct指令

conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{
LRC: ctx.LRC,
}, &ioswitchlrc.NodeProps{})
conType := ctx.DAG.NewLRCConstructAny(ctx.LRC)

for _, out := range splitNode.OutputStreams {
conType.AddInput(conNode, out, ioswitchlrc.SProps(out).StreamIndex)
for i, out := range splitNode.OutputStreams().RawArray() {
conType.AddInput(out, i)
}

for _, to := range parityToes {
@@ -115,8 +113,9 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0)
toNode.SetInput(conType.NewOutput(to.GetDataIndex()))
}
return nil
}
@@ -124,9 +123,10 @@ func buildDAGEncode(ctx *GenerateContext, fr ioswitchlrc.From, toes []ioswitchlr
// 提供数据块+编码块中的k个块,重建任意块,包括完整文件。
func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
LRC: cdssdk.DefaultLRCRedundancy,
DAG: ops2.NewGraphNodeBuilder(),
To: toes,
ToNodes: make(map[ioswitchlrc.To]ops2.ToNode),
}

calcStreamRange(&ctx)
@@ -145,11 +145,11 @@ func ReconstructAny(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.P
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
return plan.Generate(ctx.DAG.Graph, blder)
}

func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {
frNodes := make(map[int]*dag.Node)
frNodes := make(map[int]ops2.FromNode)
for _, fr := range frs {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
@@ -167,12 +167,13 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [
toIdx := to.GetDataIndex()
fr := frNodes[toIdx]
if fr != nil {
node, err := buildToNode(ctx, to)
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

fr.OutputStreams[0].To(node, 0)
toNode.SetInput(fr.Output().Var)
continue
}

@@ -189,12 +190,9 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [

// 生成Construct指令来恢复缺少的块

conNode, conType := dag.NewNode(ctx.DAG, &ops2.LRCConstructAnyType{
LRC: ctx.LRC,
}, &ioswitchlrc.NodeProps{})

for _, fr := range frNodes {
conType.AddInput(conNode, fr.OutputStreams[0], ioswitchlrc.SProps(fr.OutputStreams[0]).StreamIndex)
conNode := ctx.DAG.NewLRCConstructAny(ctx.LRC)
for i, fr := range frNodes {
conNode.AddInput(fr.Output().Var, i)
}

for _, to := range missedToes {
@@ -202,8 +200,9 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

conType.NewOutput(conNode, to.GetDataIndex()).To(toNode, 0)
toNode.SetInput(conNode.NewOutput(to.GetDataIndex()))
}

if len(completeToes) == 0 {
@@ -212,17 +211,14 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [

// 需要完整文件,则生成Join指令

joinNode := ctx.DAG.NewNode(&ops2.ChunkedJoinType{
InputCount: ctx.LRC.K,
ChunkSize: ctx.LRC.ChunkSize,
}, &ioswitchlrc.NodeProps{})
joinNode := ctx.DAG.NewChunkedJoin(ctx.LRC.ChunkSize)

for i := 0; i < ctx.LRC.K; i++ {
n := frNodes[i]
if n == nil {
conType.NewOutput(conNode, i).To(joinNode, i)
fr := frNodes[i]
if fr == nil {
joinNode.AddInput(conNode.NewOutput(i))
} else {
n.OutputStreams[0].To(joinNode, i)
joinNode.AddInput(fr.Output().Var)
}
}

@@ -231,13 +227,14 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

joinNode.OutputStreams[0].To(toNode, 0)
toNode.SetInput(joinNode.Joined())
}

// 如果不需要Construct任何块,则删除这个节点
if len(conNode.OutputStreams) == 0 {
conType.RemoveAllInputs(conNode)
if conNode.OutputStreams().Len() == 0 {
conNode.RemoveAllInputs()
ctx.DAG.RemoveNode(conNode)
}

@@ -247,9 +244,10 @@ func buildDAGReconstructAny(ctx *GenerateContext, frs []ioswitchlrc.From, toes [
// 输入同一组的多个块,恢复出剩下缺少的一个块。
func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec.PlanBuilder) error {
ctx := GenerateContext{
LRC: cdssdk.DefaultLRCRedundancy,
DAG: dag.NewGraph(),
Toes: toes,
LRC: cdssdk.DefaultLRCRedundancy,
DAG: ops2.NewGraphNodeBuilder(),
To: toes,
ToNodes: make(map[ioswitchlrc.To]ops2.ToNode),
}

calcStreamRange(&ctx)
@@ -268,33 +266,33 @@ func ReconstructGroup(frs []ioswitchlrc.From, toes []ioswitchlrc.To, blder *exec
generateClone(&ctx)
generateRange(&ctx)

return plan.Generate(ctx.DAG, blder)
return plan.Generate(ctx.DAG.Graph, blder)
}

func buildDAGReconstructGroup(ctx *GenerateContext, frs []ioswitchlrc.From, toes []ioswitchlrc.To) error {
missedGrpIdx := toes[0].GetDataIndex()

conNode := ctx.DAG.NewNode(&ops2.LRCConstructGroupType{
LRC: ctx.LRC,
TargetBlockIndex: missedGrpIdx,
}, &ioswitchlrc.NodeProps{})

for i, fr := range frs {
var inputs []*dag.StreamVar
for _, fr := range frs {
frNode, err := buildFromNode(ctx, fr)
if err != nil {
return fmt.Errorf("building from node: %w", err)
}

frNode.OutputStreams[0].To(conNode, i)
inputs = append(inputs, frNode.Output().Var)
}

missedGrpIdx := toes[0].GetDataIndex()
conNode := ctx.DAG.NewLRCConstructGroup(ctx.LRC)
missedBlk := conNode.SetupForTarget(missedGrpIdx, inputs)

for _, to := range toes {
toNode, err := buildToNode(ctx, to)
if err != nil {
return fmt.Errorf("building to node: %w", err)
}
ctx.ToNodes[to] = toNode

conNode.OutputStreams[0].To(toNode, 0)
toNode.SetInput(missedBlk)
}

return nil


+ 85
- 110
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -6,7 +6,6 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
@@ -21,7 +20,7 @@ func calcStreamRange(ctx *GenerateContext) {
Offset: math.MaxInt64,
}

for _, to := range ctx.Toes {
for _, to := range ctx.To {
if to.GetDataIndex() == -1 {
toRng := to.GetRange()
rng.ExtendStart(math2.Floor(toRng.Offset, stripSize))
@@ -48,7 +47,7 @@ func calcStreamRange(ctx *GenerateContext) {
ctx.StreamRange = rng
}

func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error) {
func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, error) {
var repRange exec.Range
var blkRange exec.Range

@@ -64,16 +63,10 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error)

switch f := f.(type) {
case *ioswitchlrc.FromNode:
n, t := dag.NewNode(ctx.DAG, &ops2.IPFSReadType{
FileHash: f.FileHash,
Option: ipfs.ReadOption{
Offset: 0,
Length: -1,
},
}, &ioswitchlrc.NodeProps{
From: f,
t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{
Offset: 0,
Length: -1,
})
ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex

if f.DataIndex == -1 {
t.Option.Offset = repRange.Offset
@@ -88,17 +81,16 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error)
}

if f.Node != nil {
n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node})
n.Env.Pinned = true
t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node})
t.Env().Pinned = true
}

return n, nil
return t, nil

case *ioswitchlrc.FromDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.FromDriverType{Handle: f.Handle}, &ioswitchlrc.NodeProps{From: f})
n.Env.ToEnvDriver()
n.Env.Pinned = true
ioswitchlrc.SProps(n.OutputStreams[0]).StreamIndex = f.DataIndex
n := ctx.DAG.NewFromDriver(f.Handle)
n.Env().ToEnvDriver()
n.Env().Pinned = true

if f.DataIndex == -1 {
f.Handle.RangeHint.Offset = repRange.Offset
@@ -115,24 +107,19 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (*dag.Node, error)
}
}

func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) {
func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) {
switch t := t.(type) {
case *ioswitchlrc.ToNode:
n, _ := dag.NewNode(ctx.DAG, &ops2.IPFSWriteType{
FileHashStoreKey: t.FileHashStoreKey,
Range: t.Range,
}, &ioswitchlrc.NodeProps{
To: t,
})
n.Env.ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node})
n.Env.Pinned = true
n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey)
n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Node})
n.Env().Pinned = true

return n, nil

case *ioswitchlrc.ToDriver:
n, _ := dag.NewNode(ctx.DAG, &ops.ToDriverType{Handle: t.Handle, Range: t.Range}, &ioswitchlrc.NodeProps{To: t})
n.Env.ToEnvDriver()
n.Env.Pinned = true
n := ctx.DAG.NewToDriver(t.Handle)
n.Env().ToEnvDriver()
n.Env().Pinned = true

return n, nil

@@ -146,21 +133,21 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (*dag.Node, error) {
// 所以理论上不会出现有指令的位置始终无法确定的情况。
func pin(ctx *GenerateContext) bool {
changed := false
ctx.DAG.Walk(func(node *dag.Node) bool {
if node.Env.Pinned {
ctx.DAG.Walk(func(node dag.Node) bool {
if node.Env().Pinned {
return true
}

var toEnv *dag.NodeEnv
for _, out := range node.OutputStreams {
for _, to := range out.Toes {
if to.Node.Env.Type == dag.EnvUnknown {
for _, out := range node.OutputStreams().RawArray() {
for _, to := range out.To().RawArray() {
if to.Node.Env().Type == dag.EnvUnknown {
continue
}

if toEnv == nil {
toEnv = &to.Node.Env
} else if !toEnv.Equals(to.Node.Env) {
toEnv = to.Node.Env()
} else if !toEnv.Equals(to.Node.Env()) {
toEnv = nil
break
}
@@ -168,35 +155,35 @@ func pin(ctx *GenerateContext) bool {
}

if toEnv != nil {
if !node.Env.Equals(*toEnv) {
if !node.Env().Equals(toEnv) {
changed = true
}

node.Env = *toEnv
*node.Env() = *toEnv
return true
}

// 否则根据输入流的始发地来固定
var fromEnv *dag.NodeEnv
for _, in := range node.InputStreams {
if in.From.Node.Env.Type == dag.EnvUnknown {
for _, in := range node.InputStreams().RawArray() {
if in.From().Node.Env().Type == dag.EnvUnknown {
continue
}

if fromEnv == nil {
fromEnv = &in.From.Node.Env
} else if !fromEnv.Equals(in.From.Node.Env) {
fromEnv = in.From().Node.Env()
} else if !fromEnv.Equals(in.From().Node.Env()) {
fromEnv = nil
break
}
}

if fromEnv != nil {
if !node.Env.Equals(*fromEnv) {
if !node.Env().Equals(fromEnv) {
changed = true
}

node.Env = *fromEnv
*node.Env() = *fromEnv
}
return true
})
@@ -206,12 +193,12 @@ func pin(ctx *GenerateContext) bool {

// 对于所有未使用的流,增加Drop指令
func dropUnused(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) == 0 {
n := ctx.DAG.NewNode(&ops.DropType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
out.To(n, 0)
ctx.DAG.Walk(func(node dag.Node) bool {
for _, out := range node.OutputStreams().RawArray() {
if out.To().Len() == 0 {
n := ctx.DAG.NewDropStream()
*n.Env() = *node.Env()
n.SetInput(out)
}
}
return true
@@ -220,44 +207,38 @@ func dropUnused(ctx *GenerateContext) {

// 为IPFS写入指令存储结果
func storeIPFSWriteResult(ctx *GenerateContext) {
dag.WalkOnlyType[*ops2.IPFSWriteType](ctx.DAG, func(node *dag.Node, typ *ops2.IPFSWriteType) bool {
if typ.FileHashStoreKey == "" {
dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool {
if n.FileHashStoreKey == "" {
return true
}

n, t := dag.NewNode(ctx.DAG, &ops.StoreType{
StoreKey: typ.FileHashStoreKey,
}, &ioswitchlrc.NodeProps{})
n.Env.ToEnvDriver()
storeNode := ctx.DAG.NewStore()
storeNode.Env().ToEnvDriver()

t.Store(n, node.OutputValues[0])
storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
return true
})
}

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回
func generateRange(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
props := ioswitchlrc.NProps(node)
if props.To == nil {
return true
}
for i := 0; i < len(ctx.To); i++ {
to := ctx.To[i]
toNode := ctx.ToNodes[to]

toDataIdx := props.To.GetDataIndex()
toRng := props.To.GetRange()
toDataIdx := to.GetDataIndex()
toRng := to.GetRange()

if toDataIdx == -1 {
n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
},
}, &ioswitchlrc.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)
n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.From().Node.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toNode.SetInput(rnged)

} else {
stripSize := int64(ctx.LRC.ChunkSize * ctx.LRC.K)
@@ -265,54 +246,48 @@ func generateRange(ctx *GenerateContext) {

blkStart := blkStartIdx * int64(ctx.LRC.ChunkSize)

n := ctx.DAG.NewNode(&ops2.RangeType{
Range: exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
},
}, &ioswitchlrc.NodeProps{})
n.Env = node.InputStreams[0].From.Node.Env

node.InputStreams[0].To(n, 0)
node.InputStreams[0].NotTo(node)
n.OutputStreams[0].To(node, 0)
n := ctx.DAG.NewRange()
toInput := toNode.Input()
*n.Env() = *toInput.Var.From().Node.Env()
rnged := n.RangeStream(toInput.Var, exec.Range{
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toNode.SetInput(rnged)
}

return true
})
}
}

// 生成Clone指令
func generateClone(ctx *GenerateContext) {
ctx.DAG.Walk(func(node *dag.Node) bool {
for _, out := range node.OutputStreams {
if len(out.Toes) <= 1 {
ctx.DAG.Walk(func(node dag.Node) bool {
for _, out := range node.OutputStreams().RawArray() {
if out.To().Len() <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneStreamType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
str := t.NewOutput(n)
str.Props = &ioswitchlrc.VarProps{StreamIndex: ioswitchlrc.SProps(out).StreamIndex}
str.To(to.Node, to.SlotIndex)
t := ctx.DAG.NewCloneStream()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
out.To().Resize(0)
t.SetInput(out)
}

for _, out := range node.OutputValues {
if len(out.Toes) <= 1 {
for _, out := range node.OutputValues().RawArray() {
if out.To().Len() <= 1 {
continue
}

n, t := dag.NewNode(ctx.DAG, &ops2.CloneVarType{}, &ioswitchlrc.NodeProps{})
n.Env = node.Env
for _, to := range out.Toes {
t.NewOutput(node).To(to.Node, to.SlotIndex)
t := ctx.DAG.NewCloneValue()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
}
out.Toes = nil
out.To(n, 0)
out.To().Resize(0)
t.SetInput(out)
}

return true


+ 0
- 17
common/pkgs/ioswitchlrc/utils.go View File

@@ -1,17 +0,0 @@
package ioswitchlrc

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
)

func NProps(n *dag.Node) *NodeProps {
return dag.NProps[*NodeProps](n)
}

func SProps(str *dag.StreamVar) *VarProps {
return dag.SProps[*VarProps](str)
}

func VProps(v *dag.ValueVar) *VarProps {
return dag.VProps[*VarProps](v)
}

+ 33
- 0
common/pkgs/mq/coordinator/package.go View File

@@ -23,6 +23,8 @@ type PackageService interface {
GetPackageCachedNodes(msg *GetPackageCachedNodes) (*GetPackageCachedNodesResp, *mq.CodeMessage)

GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, *mq.CodeMessage)

AddPackageAccessStatCounter(msg *AddPackageAccessStatCounter) (*AddPackageAccessStatCounterResp, *mq.CodeMessage)
}

// 获取Package基本信息
@@ -254,3 +256,34 @@ func NewGetPackageLoadedNodesResp(nodeIDs []cdssdk.NodeID) *GetPackageLoadedNode
func (client *Client) GetPackageLoadedNodes(msg *GetPackageLoadedNodes) (*GetPackageLoadedNodesResp, error) {
return mq.Request(Service.GetPackageLoadedNodes, client.rabbitCli, msg)
}

// 更新Pacakge访问统计中的计数值
var _ = Register(Service.AddPackageAccessStatCounter)

type AddPackageAccessStatCounter struct {
mq.MessageBodyBase
Entries []AddPackageAccessStatCounterEntry `json:"entries"`
}
type AddPackageAccessStatCounterEntry struct {
PackageID cdssdk.PackageID `json:"packageID" db:"PackageID"`
NodeID cdssdk.NodeID `json:"nodeID" db:"NodeID"`
Value float64 `json:"value" db:"Value"`
}

type AddPackageAccessStatCounterResp struct {
mq.MessageBodyBase
}

func NewAddPackageAccessStatCounter(entries []AddPackageAccessStatCounterEntry) *AddPackageAccessStatCounter {
return &AddPackageAccessStatCounter{
Entries: entries,
}
}

func NewAddPackageAccessStatCounterResp() *AddPackageAccessStatCounterResp {
return &AddPackageAccessStatCounterResp{}
}

func (client *Client) AddPackageAccessStatCounter(msg *AddPackageAccessStatCounter) (*AddPackageAccessStatCounterResp, error) {
return mq.Request(Service.AddPackageAccessStatCounter, client.rabbitCli, msg)
}

+ 0
- 35
common/pkgs/mq/coordinator/storage.go View File

@@ -1,8 +1,6 @@
package coordinator

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
@@ -15,8 +13,6 @@ type StorageService interface {
GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage)

StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage)

GetPackageLoadLogDetails(msg *GetPackageLoadLogDetails) (*GetPackageLoadLogDetailsResp, *mq.CodeMessage)
}

// 获取Storage信息
@@ -102,34 +98,3 @@ func NewStoragePackageLoadedResp() *StoragePackageLoadedResp {
func (client *Client) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, error) {
return mq.Request(Service.StoragePackageLoaded, client.rabbitCli, msg)
}

// 查询Package的导入记录
var _ = Register(Service.GetPackageLoadLogDetails)

type GetPackageLoadLogDetails struct {
mq.MessageBodyBase
PackageID cdssdk.PackageID `json:"packageID"`
}
type GetPackageLoadLogDetailsResp struct {
mq.MessageBodyBase
Logs []PackageLoadLogDetail `json:"logs"`
}
type PackageLoadLogDetail struct {
Storage model.Storage `json:"storage"`
UserID cdssdk.UserID `json:"userID"`
CreateTime time.Time `json:"createTime"`
}

func ReqGetPackageLoadLogDetails(packageID cdssdk.PackageID) *GetPackageLoadLogDetails {
return &GetPackageLoadLogDetails{
PackageID: packageID,
}
}
func RespGetPackageLoadLogDetails(logs []PackageLoadLogDetail) *GetPackageLoadLogDetailsResp {
return &GetPackageLoadLogDetailsResp{
Logs: logs,
}
}
func (client *Client) GetPackageLoadLogDetails(msg *GetPackageLoadLogDetails) (*GetPackageLoadLogDetailsResp, error) {
return mq.Request(Service.GetPackageLoadLogDetails, client.rabbitCli, msg)
}

+ 18
- 0
common/pkgs/mq/scanner/event/update_package_access_stat_amount.go View File

@@ -0,0 +1,18 @@
package event

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type UpdatePackageAccessStatAmount struct {
EventBase
PackageIDs []cdssdk.PackageID `json:"packageIDs"`
}

func NewUpdatePackageAccessStatAmount(packageIDs []cdssdk.PackageID) *UpdatePackageAccessStatAmount {
return &UpdatePackageAccessStatAmount{
PackageIDs: packageIDs,
}
}

func init() {
Register[*UpdatePackageAccessStatAmount]()
}

+ 7
- 0
common/pkgs/package_stat/config.go View File

@@ -0,0 +1,7 @@
package packagestat

import "time"

type Config struct {
ReportInterval time.Duration
}

+ 98
- 0
common/pkgs/package_stat/package_stat.go View File

@@ -0,0 +1,98 @@
package packagestat

import (
"fmt"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sync2"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type PackageStatEvent interface{}

type amountKey struct {
PackageID cdssdk.PackageID
NodeID cdssdk.NodeID
}

type amount struct {
Counter float64
}

type PackageStat struct {
cfg Config
amounts map[amountKey]*amount
lock sync.Mutex
}

func NewPackageStat(cfg Config) *PackageStat {
return &PackageStat{
cfg: cfg,
amounts: make(map[amountKey]*amount),
}
}

func (p *PackageStat) AddAccessCounter(pkgID cdssdk.PackageID, nodeID cdssdk.NodeID, value float64) {
p.lock.Lock()
defer p.lock.Unlock()

key := amountKey{
PackageID: pkgID,
NodeID: nodeID,
}
if _, ok := p.amounts[key]; !ok {
p.amounts[key] = &amount{}
}
p.amounts[key].Counter += value
}

func (p *PackageStat) Start() *sync2.UnboundChannel[PackageStatEvent] {
ch := sync2.NewUnboundChannel[PackageStatEvent]()

go func() {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
ch.Send(fmt.Errorf("new coordinator client: %w", err))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

ticker := time.NewTicker(p.cfg.ReportInterval)
for {
<-ticker.C

p.lock.Lock()
amts := p.amounts
p.amounts = make(map[amountKey]*amount)

var addEntries []coormq.AddPackageAccessStatCounterEntry
for key, amount := range amts {
addEntries = append(addEntries, coormq.AddPackageAccessStatCounterEntry{
PackageID: key.PackageID,
NodeID: key.NodeID,
Value: amount.Counter,
})
}
p.lock.Unlock()

_, err := coorCli.AddPackageAccessStatCounter(coormq.NewAddPackageAccessStatCounter(addEntries))
if err != nil {
logger.Errorf("add all package access stat counter: %v", err)

p.lock.Lock()
for key, a := range amts {
if _, ok := p.amounts[key]; !ok {
p.amounts[key] = &amount{}
}
p.amounts[key].Counter += a.Counter
}
p.lock.Unlock()
continue
}
}
}()
return ch
}

+ 13
- 30
coordinator/internal/mq/package.go View File

@@ -10,7 +10,6 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

@@ -129,6 +128,13 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack
Warnf("deleting unused package: %w", err.Error())
}

err = svc.db.PackageAccessStat().DeleteByPackageID(tx, msg.PackageID)
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("deleting package access stat: %w", err.Error())
}

return nil
})
if err != nil {
@@ -215,36 +221,13 @@ func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*c
return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs))
}

func (svc *Service) GetPackageLoadLogDetails(msg *coormq.GetPackageLoadLogDetails) (*coormq.GetPackageLoadLogDetailsResp, *mq.CodeMessage) {
var logs []coormq.PackageLoadLogDetail
rawLogs, err := svc.db.StoragePackageLog().GetByPackageID(svc.db.SQLCtx(), msg.PackageID)
func (svc *Service) AddPackageAccessStatCounter(msg *coormq.AddPackageAccessStatCounter) (*coormq.AddPackageAccessStatCounterResp, *mq.CodeMessage) {
err := svc.db.PackageAccessStat().BatchAddCounter(svc.db.SQLCtx(), msg.Entries)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting storage package log: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get storage package log failed")
}

stgs := make(map[cdssdk.StorageID]model.Storage)

for _, raw := range rawLogs {
stg, ok := stgs[raw.StorageID]
if !ok {
stg, err = svc.db.Storage().GetByID(svc.db.SQLCtx(), raw.StorageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("getting storage: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get storage failed")
}

stgs[raw.StorageID] = stg
}

logs = append(logs, coormq.PackageLoadLogDetail{
Storage: stg,
UserID: raw.UserID,
CreateTime: raw.CreateTime,
})
errMsg := fmt.Sprintf("batch add package access stat counter: %s", err.Error())
logger.Error(errMsg)
return nil, mq.Failed(errorcode.OperationFailed, errMsg)
}

return mq.ReplyOK(coormq.RespGetPackageLoadLogDetails(logs))
return mq.ReplyOK(coormq.NewAddPackageAccessStatCounterResp())
}

+ 0
- 6
coordinator/internal/mq/storage.go View File

@@ -3,7 +3,6 @@ package mq
import (
"database/sql"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/consts/errorcode"
@@ -54,11 +53,6 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo
return fmt.Errorf("creating storage package: %w", err)
}

err = svc.db.StoragePackageLog().Create(tx, msg.StorageID, msg.PackageID, msg.UserID, time.Now())
if err != nil {
return fmt.Errorf("creating storage package log: %w", err)
}

stg, err := svc.db.Storage().GetByID(tx, msg.StorageID)
if err != nil {
return fmt.Errorf("getting storage: %w", err)


+ 7
- 6
scanner/internal/config/config.go View File

@@ -9,12 +9,13 @@ import (
)

type Config struct {
ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"`
NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
AccessStatHistoryAmount float64 `json:"accessStatHistoryAmount"`
ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"`
NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用
Logger log.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
DistLock distlock.Config `json:"distlock"`
}

var cfg Config


+ 20
- 52
scanner/internal/event/check_package_redundancy.go View File

@@ -40,9 +40,8 @@ func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageR
}

type NodeLoadInfo struct {
Node cdssdk.Node
LoadsRecentMonth int
LoadsRecentYear int
Node cdssdk.Node
AccessAmount float64
}

func (t *CheckPackageRedundancy) TryMerge(other Event) bool {
@@ -62,6 +61,8 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
log.Debugf("end, time: %v", time.Since(startTime))
}()

// TODO 应该像其他event一样直接读取数据库

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("new coordinator client: %s", err.Error())
@@ -75,9 +76,9 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
return
}

getLogs, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID))
stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.SQLCtx(), t.PackageID)
if err != nil {
log.Warnf("getting package load log details: %s", err.Error())
log.Warnf("getting package access stats: %s", err.Error())
return
}

@@ -100,18 +101,12 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
}
}

for _, log := range getLogs.Logs {
info, ok := userAllNodes[log.Storage.NodeID]
for _, stat := range stats {
info, ok := userAllNodes[stat.NodeID]
if !ok {
continue
}

sinceNow := time.Since(log.CreateTime)
if sinceNow.Hours() < monthHours {
info.LoadsRecentMonth++
} else if sinceNow.Hours() < yearHours {
info.LoadsRecentYear++
}
info.AccessAmount = stat.Amount
}

var changedObjects []coormq.UpdatingObjectRedundancy
@@ -215,8 +210,11 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) {
func (t *CheckPackageRedundancy) chooseRedundancy(obj stgmod.ObjectDetail, userAllNodes map[cdssdk.NodeID]*NodeLoadInfo) (cdssdk.Redundancy, []*NodeLoadInfo) {
switch obj.Object.Redundancy.(type) {
case *cdssdk.NoneRedundancy:
newLRCNodes := t.chooseNewNodesForLRC(&cdssdk.DefaultLRCRedundancy, userAllNodes)
return &cdssdk.DefaultLRCRedundancy, newLRCNodes
newNodes := t.chooseNewNodesForEC(&cdssdk.DefaultECRedundancy, userAllNodes)
return &cdssdk.DefaultECRedundancy, newNodes

// newLRCNodes := t.chooseNewNodesForLRC(&cdssdk.DefaultLRCRedundancy, userAllNodes)
// return &cdssdk.DefaultLRCRedundancy, newLRCNodes

case *cdssdk.LRCRedundancy:
newLRCNodes := t.rechooseNodesForLRC(obj, &cdssdk.DefaultLRCRedundancy, userAllNodes)
@@ -263,12 +261,7 @@ func (t *CheckPackageRedundancy) summaryRepObjectBlockNodes(objs []stgmod.Object

func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

return right.LoadsRecentYear - left.LoadsRecentYear
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyNodes(red.RepCount, sortedNodes)
@@ -276,12 +269,7 @@ func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy,

func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

return right.LoadsRecentYear - left.LoadsRecentYear
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyNodes(red.N, sortedNodes)
@@ -289,12 +277,7 @@ func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, a

func (t *CheckPackageRedundancy) chooseNewNodesForLRC(red *cdssdk.LRCRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo {
sortedNodes := sort2.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

return right.LoadsRecentYear - left.LoadsRecentYear
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyNodes(red.N, sortedNodes)
@@ -323,18 +306,13 @@ func (t *CheckPackageRedundancy) rechooseNodesForRep(mostBlockNodeIDs []cdssdk.N
}

sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

// 已经缓存了文件块的节点优先选择
v := sort2.CmpBool(right.HasBlock, left.HasBlock)
if v != 0 {
return v
}

return right.LoadsRecentYear - left.LoadsRecentYear
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

return t.chooseSoManyNodes(red.RepCount, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo }))
@@ -363,18 +341,13 @@ func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red
}

sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

// 已经缓存了文件块的节点优先选择
v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
if v != 0 {
return v
}

return right.LoadsRecentYear - left.LoadsRecentYear
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

// TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择
@@ -404,18 +377,13 @@ func (t *CheckPackageRedundancy) rechooseNodesForLRC(obj stgmod.ObjectDetail, re
}

sortedNodes := sort2.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int {
dm := right.LoadsRecentMonth - left.LoadsRecentMonth
if dm != 0 {
return dm
}

// 已经缓存了文件块的节点优先选择
v := sort2.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1)
if v != 0 {
return v
}

return right.LoadsRecentYear - left.LoadsRecentYear
return sort2.Cmp(right.AccessAmount, left.AccessAmount)
})

// TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择


+ 10
- 3
scanner/internal/event/clean_pinned.go View File

@@ -54,6 +54,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
log.Debugf("end, time: %v", time.Since(startTime))
}()

// TODO 应该与其他event一样,直接访问数据库
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("new coordinator client: %s", err.Error())
@@ -67,12 +68,18 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) {
return
}

getLoadLog, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID))
stats, err := execCtx.Args.DB.PackageAccessStat().GetByPackageID(execCtx.Args.DB.SQLCtx(), t.PackageID)
if err != nil {
log.Warnf("getting package load log details: %s", err.Error())
log.Warnf("getting package access stat: %s", err.Error())
return
}
readerNodeIDs := lo.Map(getLoadLog.Logs, func(item coormq.PackageLoadLogDetail, idx int) cdssdk.NodeID { return item.Storage.NodeID })
var readerNodeIDs []cdssdk.NodeID
for _, item := range stats {
// TODO 可以考虑做成配置
if item.Amount >= float64(len(getObjs.Objects)/2) {
readerNodeIDs = append(readerNodeIDs, item.NodeID)
}
}

// 注意!需要保证allNodeID包含所有之后可能用到的节点ID
// TOOD 可以考虑设计Cache机制


+ 68
- 0
scanner/internal/event/update_package_access_stat_amount.go View File

@@ -0,0 +1,68 @@
package event

import (
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/logger"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/config"
)

type UpdatePackageAccessStatAmount struct {
*scevt.UpdatePackageAccessStatAmount
}

func NewUpdatePackageAccessStatAmount(evt *scevt.UpdatePackageAccessStatAmount) *UpdatePackageAccessStatAmount {
return &UpdatePackageAccessStatAmount{
UpdatePackageAccessStatAmount: evt,
}
}

func (t *UpdatePackageAccessStatAmount) TryMerge(other Event) bool {
event, ok := other.(*UpdatePackageAccessStatAmount)
if !ok {
return false
}

if t.PackageIDs == nil {
return true
}

if event.PackageIDs == nil {
t.PackageIDs = nil
return true
}

t.PackageIDs = append(t.PackageIDs, event.PackageIDs...)
t.PackageIDs = lo.Uniq(t.PackageIDs)
return true
}

func (t *UpdatePackageAccessStatAmount) Execute(execCtx ExecuteContext) {
log := logger.WithType[AgentCacheGC]("Event")
startTime := time.Now()
log.Debugf("begin with %v", logger.FormatStruct(t.UpdatePackageAccessStatAmount))
defer func() {
log.Debugf("end, time: %v", time.Since(startTime))
}()

if t.PackageIDs == nil {
err := execCtx.Args.DB.PackageAccessStat().UpdateAllAmount(execCtx.Args.DB.SQLCtx(), config.Cfg().AccessStatHistoryAmount)
if err != nil {
log.Warnf("update all package access stat amount: %v", err)
return
}

} else {
err := execCtx.Args.DB.PackageAccessStat().BatchUpdateAmount(execCtx.Args.DB.SQLCtx(), t.PackageIDs, config.Cfg().AccessStatHistoryAmount)
if err != nil {
log.Warnf("batch update package access stat amount: %v", err)
return
}
}
}

func init() {
RegisterMessageConvertor(NewUpdatePackageAccessStatAmount)
}

+ 32
- 0
scanner/internal/tickevent/update_all_package_access_stat_amount.go View File

@@ -0,0 +1,32 @@
package tickevent

import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
evt "gitlink.org.cn/cloudream/storage/scanner/internal/event"
)

type UpdateAllPackageAccessStatAmount struct {
todayUpdated bool
}

func NewUpdateAllPackageAccessStatAmount() *UpdateAllPackageAccessStatAmount {
return &UpdateAllPackageAccessStatAmount{}
}

func (e *UpdateAllPackageAccessStatAmount) Execute(ctx ExecuteContext) {
log := logger.WithType[UpdateAllPackageAccessStatAmount]("TickEvent")
log.Debugf("begin")
defer log.Debugf("end")

nowHour := time.Now().Hour()
if nowHour != 0 {
e.todayUpdated = false
return
}
e.todayUpdated = true

ctx.Args.EventExecutor.Post(evt.NewUpdatePackageAccessStatAmount(event.NewUpdatePackageAccessStatAmount(nil)))
}

+ 2
- 0
scanner/main.go View File

@@ -130,4 +130,6 @@ func startTickEvent(tickExecutor *tickevent.Executor) {
tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000})

tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000})

tickExecutor.Start(tickevent.NewUpdateAllPackageAccessStatAmount(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000})
}

Loading…
Cancel
Save