diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index 141d5de..8557bd5 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -7,6 +7,7 @@ import ( c "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/grpc" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -21,6 +22,7 @@ type Config struct { IPFS ipfs.Config `json:"ipfs"` DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` + Downloader downloader.Config `json:"downloader"` } var cfg Config diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index 8ee17a3..7947d94 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -5,7 +5,7 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/logger" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -48,8 +48,7 @@ func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { return fmt.Errorf("recv message failed, err: %w", err) } - // 将接收到的数据写入管道 - err = myio.WriteAll(pw, msg.Data) + err = io2.WriteAll(pw, msg.Data) if err != nil { pw.CloseWithError(io.ErrClosedPipe) logger.Warnf("write data to file failed, err: %s", err.Error()) diff --git a/agent/internal/grpc/service.go b/agent/internal/grpc/service.go index 669a20c..d208469 100644 --- a/agent/internal/grpc/service.go +++ b/agent/internal/grpc/service.go @@ -5,7 +5,7 @@ import ( "io" log "gitlink.org.cn/cloudream/common/pkgs/logger" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" @@ -52,7 +52,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro return fmt.Errorf("recv message failed, err: %w", err) } - err = myio.WriteAll(writer, msg.Data) // 将数据写入IPFS文件流 + err = io2.WriteAll(writer, msg.Data) if err != nil { writer.Abort(io.ErrClosedPipe) // 写入出错时关闭文件写入 log.Warnf("write data to file failed, err: %s", err.Error()) diff --git a/agent/internal/task/cache_move_package.go b/agent/internal/task/cache_move_package.go index 6a0a6a5..9093a1f 100644 --- a/agent/internal/task/cache_move_package.go +++ b/agent/internal/task/cache_move_package.go @@ -60,23 +60,14 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { } defer stgglb.CoordinatorMQPool.Release(coorCli) - // 获取包内对象详情 - getResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID)) - if err != nil { - return fmt.Errorf("getting package object details: %w", err) - } - - // 获取IPFS客户端 ipfsCli, err := stgglb.IPFSPool.Acquire() if err != nil { return fmt.Errorf("new ipfs client: %w", err) } defer ipfsCli.Close() - // 遍历并下载对象 - objIter := iterator.NewDownloadObjectIterator(getResp.Objects, &iterator.DownloadContext{ - Distlock: ctx.distlock, - }) + // TODO 可以考虑优化,比如rep类型的直接pin就可以 + objIter := ctx.downloader.DownloadPackage(t.packageID) defer objIter.Close() for { diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 0dc2907..16c9d0f 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -13,7 +13,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" myref "gitlink.org.cn/cloudream/common/utils/reflect" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage/common/consts" @@ -93,8 +93,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e } t.FullOutputPath = outputDirPath - // 获取包对象详情 - getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID)) + getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.packageID)) if err != nil { return fmt.Errorf("getting package object details: %w", err) } @@ -255,8 +254,7 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i if bsc < osc { var fileStrs []io.ReadCloser - // 初始化RS编码器 - rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) + rs, err := ec.NewStreamRs(ecRed.K, ecRed.N, ecRed.ChunkSize) if err != nil { return nil, nil, fmt.Errorf("new rs: %w", err) } @@ -276,8 +274,7 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i fileStrs = append(fileStrs, str) } - // 将多个文件流转换为统一的ReadCloser接口 - fileReaders, filesCloser := myio.ToReaders(fileStrs) + fileReaders, filesCloser := io2.ToReaders(fileStrs) // 准备恢复数据所需的信息和变量 var indexes []int @@ -292,9 +289,8 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i }) } - // 执行数据恢复,并将恢复后的数据转换为ReadCloser - outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) - return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { + outputs, outputsCloser := io2.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return io2.AfterReadClosed(io2.Length(io2.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { filesCloser() outputsCloser() }), pinnedBlocks, nil diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index b0ed403..84df74e 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -1,10 +1,11 @@ package task import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务 - "gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包 - "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" // 引入网络连接状态收集器 - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" // 引入IO开关服务 + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) // TaskContext 定义了任务执行的上下文环境,包含分布式锁服务、IO开关和网络连接状态收集器 @@ -12,6 +13,7 @@ type TaskContext struct { distlock *distlock.Service sw *ioswitch.Switch connectivity *connectivity.Collector + downloader *downloader.Downloader } // CompleteFn 类型定义了任务完成时需要执行的函数,用于设置任务的执行结果 @@ -29,11 +31,11 @@ type Task = task.Task[TaskContext] // CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式 type CompleteOption = task.CompleteOption -// NewManager 创建并返回一个新的任务管理器实例,需要提供分布式锁服务、IO开关和网络连接状态收集器 -func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector) Manager { +func NewManager(distlock *distlock.Service, sw *ioswitch.Switch, connectivity *connectivity.Collector, downloader *downloader.Downloader) Manager { return task.NewManager(TaskContext{ distlock: distlock, sw: sw, connectivity: connectivity, + downloader: downloader, }) } diff --git a/agent/main.go b/agent/main.go index 7e85e2c..ff01bfb 100644 --- a/agent/main.go +++ b/agent/main.go @@ -13,6 +13,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" @@ -104,11 +105,13 @@ func main() { // 初始化数据切换开关 sw := ioswitch.NewSwitch() - // 启动任务管理器和相关服务 + dlder := downloader.NewDownloader(config.Cfg().Downloader) + + //处置协调端、客户端命令(可多建几个) wg := sync.WaitGroup{} wg.Add(4) - taskMgr := task.NewManager(distlock, &sw, &conCol) + taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder) // 启动命令服务器 agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, &sw), config.Cfg().ID, &config.Cfg().RabbitMQ) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 62fa7d4..aed9098 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" ) @@ -19,6 +20,7 @@ type Config struct { IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` + Downloader downloader.Config `json:"downloader"` } var cfg Config diff --git a/client/internal/http/object.go b/client/internal/http/object.go index 9cb3e76..9ceeb8c 100644 --- a/client/internal/http/object.go +++ b/client/internal/http/object.go @@ -13,6 +13,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) // ObjectService 服务结构体,处理对象相关的HTTP请求 @@ -100,7 +101,17 @@ func (s *ObjectService) Download(ctx *gin.Context) { return } - file, err := s.svc.ObjectSvc().Download(req.UserID, req.ObjectID) + off := req.Offset + len := int64(-1) + if req.Length != nil { + len = *req.Length + } + + file, err := s.svc.ObjectSvc().Download(req.UserID, downloader.DownloadReqeust{ + ObjectID: req.ObjectID, + Offset: off, + Length: len, + }) if err != nil { log.Warnf("downloading object: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "download object failed")) diff --git a/client/internal/services/object.go b/client/internal/services/object.go index 594c412..b4e30c5 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -7,8 +7,8 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" mytask "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -77,39 +77,15 @@ func (svc *ObjectService) Move(userID cdssdk.UserID, movings []cdssdk.MovingObje return resp.Successes, nil } -// Download 用于下载指定的对象。 -// userID: 表示用户的唯一标识。 -// objectID: 表示要下载的对象的唯一标识。 -// 返回值: 返回一个正在下载的对象的迭代器和可能遇到的错误。 -func (svc *ObjectService) Download(userID cdssdk.UserID, objectID cdssdk.ObjectID) (*iterator.IterDownloadingObject, error) { - // 从协调器MQ池中获取客户端 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - // 确保在函数结束时释放客户端 - defer stgglb.CoordinatorMQPool.Release(coorCli) - - // 向协调器请求对象详情 - resp, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails([]cdssdk.ObjectID{objectID})) - if err != nil { - return nil, fmt.Errorf("requesting to coordinator") - } - - // 检查对象是否存在 - if resp.Objects[0] == nil { - return nil, fmt.Errorf("object not found") - } - - // 创建下载对象的迭代器 - iter := iterator.NewDownloadObjectIterator([]stgmod.ObjectDetail{*resp.Objects[0]}, &iterator.DownloadContext{ - Distlock: svc.DistLock, - }) - // 确保在函数结束时关闭迭代器 - defer iter.Close() +func (svc *ObjectService) Download(userID cdssdk.UserID, req downloader.DownloadReqeust) (*downloader.Downloading, error) { + // TODO 检查用户ID + iter := svc.Downloader.DownloadObjects([]downloader.DownloadReqeust{req}) // 初始化下载过程 downloading, err := iter.MoveNext() + if downloading.Object == nil { + return nil, fmt.Errorf("object not found") + } if err != nil { return nil, err } diff --git a/client/internal/services/package.go b/client/internal/services/package.go index 2be932f..c312036 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -6,7 +6,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -82,27 +82,9 @@ func (svc *PackageService) Create(userID cdssdk.UserID, bucketID cdssdk.BucketID return resp.Package, nil } -// DownloadPackage 下载指定包的内容 -func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (iterator.DownloadingObjectIterator, error) { - // 从协调器MQ池中获取客户端 - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - // 向协调器请求获取包内对象的详情 - getObjsResp, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(packageID)) - if err != nil { - return nil, fmt.Errorf("getting package object details: %w", err) - } - - // 创建下载对象的迭代器 - iter := iterator.NewDownloadObjectIterator(getObjsResp.Objects, &iterator.DownloadContext{ - Distlock: svc.DistLock, - }) - - return iter, nil +func (svc *PackageService) DownloadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID) (downloader.DownloadIterator, error) { + // TODO 检查用户ID + return svc.Downloader.DownloadPackage(packageID), nil } // DeletePackage 删除指定的包 diff --git a/client/internal/services/service.go b/client/internal/services/service.go index cf7770a..dfbc2f6 100644 --- a/client/internal/services/service.go +++ b/client/internal/services/service.go @@ -3,30 +3,22 @@ package services import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" // 导入分布锁服务包 - "gitlink.org.cn/cloudream/storage/client/internal/task" // 导入任务管理服务包 + "gitlink.org.cn/cloudream/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/client/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) // Service 结构体封装了分布锁服务和任务管理服务。 type Service struct { - DistLock *distlock.Service // DistLock 用于分布式环境下的锁服务 - TaskMgr *task.Manager // TaskMgr 用于任务的创建、管理和执行 + DistLock *distlock.Service + TaskMgr *task.Manager + Downloader *downloader.Downloader } -// NewService 创建一个新的Service实例。 -// -// 参数: -// -// distlock *distlock.Service: 分布式锁服务的实例。 -// taskMgr *task.Manager: 任务管理器的实例。 -// -// 返回值: -// -// *Service: 初始化后的Service实例。 -// error: 如果创建过程中遇到错误,则返回错误信息,否则为nil。 -func NewService(distlock *distlock.Service, taskMgr *task.Manager) (*Service, error) { +func NewService(distlock *distlock.Service, taskMgr *task.Manager, downloader *downloader.Downloader) (*Service, error) { return &Service{ - DistLock: distlock, - TaskMgr: taskMgr, + DistLock: distlock, + TaskMgr: taskMgr, + Downloader: downloader, }, nil } diff --git a/client/main.go b/client/main.go index 02cdeea..4f5b22a 100644 --- a/client/main.go +++ b/client/main.go @@ -14,6 +14,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" + "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" ) /* @@ -76,8 +77,14 @@ func main() { // 创建任务管理器 taskMgr := task.NewManager(distlockSvc, &conCol) +<<<<<<< HEAD // 创建服务实例 svc, err := services.NewService(distlockSvc, &taskMgr) +======= + dlder := downloader.NewDownloader(config.Cfg().Downloader) + + svc, err := services.NewService(distlockSvc, &taskMgr, &dlder) +>>>>>>> 770feaf2da11a3de00fa3ec57b16dc54ff31b288 if err != nil { logger.Warnf("new services failed, err: %s", err.Error()) os.Exit(1) diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index e54c953..867b619 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -35,5 +35,8 @@ }, "connectivity": { "testInterval": 300 + }, + "downloader": { + "maxStripCacheCount": 100 } } \ No newline at end of file diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 9791f83..df81039 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -28,5 +28,8 @@ }, "connectivity": { "testInterval": 300 + }, + "downloader": { + "maxStripCacheCount": 100 } } \ No newline at end of file diff --git a/common/assets/confs/sysSetting.xml b/common/assets/confs/sysSetting.xml deleted file mode 100644 index d78805b..0000000 --- a/common/assets/confs/sysSetting.xml +++ /dev/null @@ -1,63 +0,0 @@ - - -local.addr -101.201.215.165 - - -controller.addr -101.201.215.196 - - -agents.addr -/hw-sh/123.60.146.162 -/hw-bj/120.46.183.86 -/ali/101.201.215.165 - - -agents.location -ali -hw-sh -hw-bj - - -oec.controller.thread.num -4 - - -oec.agent.thread.num -2 - - -oec.cmddist.thread.num -2 - - -packet.size -131072 - - -ec.concurrent.num -2 - - -ec.policy -rs_9_6RS96961-1 -rs_3_2RS96321-1 -edu_9_6EDU96961-1 -edu_3_2EDU32321-1 -dfc_9_4DFC941-13,2 - - -inter.inner.addr - -172.23.85.69172.23.85.71172.23.85.70 -192.168.0.69 -192.168.0.76 - - -101.201.215.196101.201.215.165101.201.214.111 -123.60.146.162 -120.46.183.86 - - - diff --git a/common/pkgs/db/utils.go b/common/pkgs/db/utils.go index 2614355..c932717 100644 --- a/common/pkgs/db/utils.go +++ b/common/pkgs/db/utils.go @@ -4,7 +4,7 @@ import ( "database/sql" "github.com/jmoiron/sqlx" - "gitlink.org.cn/cloudream/common/utils/math" + "gitlink.org.cn/cloudream/common/utils/math2" ) const ( @@ -27,7 +27,7 @@ func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, call batchSize := maxPlaceholderCount / argCnt for len(arr) > 0 { - curBatchSize := math.Min(batchSize, len(arr)) + curBatchSize := math2.Min(batchSize, len(arr)) ret, err := ctx.NamedExec(sql, arr[:curBatchSize]) if err != nil { @@ -59,7 +59,7 @@ func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, cal batchSize := maxPlaceholderCount / argCnt for len(arr) > 0 { - curBatchSize := math.Min(batchSize, len(arr)) + curBatchSize := math2.Min(batchSize, len(arr)) ret, err := ctx.NamedQuery(sql, arr[:curBatchSize]) if err != nil { diff --git a/common/pkgs/downloader/config.go b/common/pkgs/downloader/config.go new file mode 100644 index 0000000..c89ef6a --- /dev/null +++ b/common/pkgs/downloader/config.go @@ -0,0 +1,6 @@ +package downloader + +type Config struct { + // EC模式的Object的条带缓存数量 + MaxStripCacheCount int `json:"maxStripCacheCount"` +} diff --git a/common/pkgs/downloader/downloader.go b/common/pkgs/downloader/downloader.go new file mode 100644 index 0000000..513736d --- /dev/null +++ b/common/pkgs/downloader/downloader.go @@ -0,0 +1,123 @@ +package downloader + +import ( + "fmt" + "io" + + lru "github.com/hashicorp/golang-lru/v2" + "gitlink.org.cn/cloudream/common/pkgs/iterator" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" +) + +const ( + DefaultMaxStripCacheCount = 128 +) + +type DownloadIterator = iterator.Iterator[*Downloading] + +type DownloadReqeust struct { + ObjectID cdssdk.ObjectID + Offset int64 + Length int64 +} + +type downloadReqeust2 struct { + Detail *stgmod.ObjectDetail + Raw DownloadReqeust +} + +type Downloading struct { + Object *cdssdk.Object + File io.ReadCloser // 文件流,如果文件不存在,那么为nil + Request DownloadReqeust +} + +type Downloader struct { + strips *StripCache +} + +func NewDownloader(cfg Config) Downloader { + if cfg.MaxStripCacheCount == 0 { + cfg.MaxStripCacheCount = DefaultMaxStripCacheCount + } + + ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount) + return Downloader{ + strips: ch, + } +} + +func (d *Downloader) DownloadObjects(reqs []DownloadReqeust) DownloadIterator { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + objIDs := make([]cdssdk.ObjectID, len(reqs)) + for i, req := range reqs { + objIDs[i] = req.ObjectID + } + + if len(objIDs) == 0 { + return iterator.Empty[*Downloading]() + } + + objDetails, err := coorCli.GetObjectDetails(coormq.ReqGetObjectDetails(objIDs)) + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err)) + } + + req2s := make([]downloadReqeust2, len(reqs)) + for i, req := range reqs { + req2s[i] = downloadReqeust2{ + Detail: objDetails.Objects[i], + Raw: req, + } + } + + return NewDownloadObjectIterator(d, req2s) +} + +func (d *Downloader) DownloadPackage(pkgID cdssdk.PackageID) DownloadIterator { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("new coordinator client: %w", err)) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + pkgDetail, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(pkgID)) + if err != nil { + return iterator.FuseError[*Downloading](fmt.Errorf("request to coordinator: %w", err)) + } + + req2s := make([]downloadReqeust2, len(pkgDetail.Objects)) + for i, objDetail := range pkgDetail.Objects { + dt := objDetail + req2s[i] = downloadReqeust2{ + Detail: &dt, + Raw: DownloadReqeust{ + ObjectID: objDetail.Object.ObjectID, + Offset: 0, + Length: objDetail.Object.Size, + }, + } + } + + return NewDownloadObjectIterator(d, req2s) +} + +type ObjectECStrip struct { + Data []byte + ObjectFileHash string // 添加这条缓存时,Object的FileHash +} + +type ECStripKey struct { + ObjectID cdssdk.ObjectID + StripPosition int64 +} + +type StripCache = lru.Cache[ECStripKey, ObjectECStrip] diff --git a/common/pkgs/downloader/io.go b/common/pkgs/downloader/io.go new file mode 100644 index 0000000..c32cb5a --- /dev/null +++ b/common/pkgs/downloader/io.go @@ -0,0 +1,148 @@ +package downloader + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/ipfs" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" +) + +type IPFSReader struct { + node cdssdk.Node + fileHash string + stream io.ReadCloser + offset int64 +} + +func NewIPFSReader(node cdssdk.Node, fileHash string) *IPFSReader { + return &IPFSReader{ + node: node, + fileHash: fileHash, + } +} + +func NewIPFSReaderWithRange(node cdssdk.Node, fileHash string, rng ipfs.ReadOption) io.ReadCloser { + str := &IPFSReader{ + node: node, + fileHash: fileHash, + } + str.Seek(rng.Offset, io.SeekStart) + if rng.Length > 0 { + return io2.Length(str, rng.Length) + } + + return str +} + +func (r *IPFSReader) Seek(offset int64, whence int) (int64, error) { + if whence == io.SeekEnd { + return 0, fmt.Errorf("seek end not supported") + } + + if whence == io.SeekCurrent { + return 0, fmt.Errorf("seek current not supported") + } + + if r.stream == nil { + r.offset = offset + return r.offset, nil + } + + // 如果文件流已经打开,那么如果seek的位置和当前位置不同,那么需要重新打开文件流 + if offset != r.offset { + var err error + r.stream.Close() + r.offset = offset + r.stream, err = r.openStream() + if err != nil { + return 0, fmt.Errorf("reopen stream: %w", err) + } + } + + return r.offset, nil +} + +func (r *IPFSReader) Read(buf []byte) (int, error) { + if r.stream == nil { + var err error + r.stream, err = r.openStream() + if err != nil { + return 0, err + } + } + + n, err := r.stream.Read(buf) + r.offset += int64(n) + return n, err +} + +func (r *IPFSReader) Close() error { + if r.stream != nil { + return r.stream.Close() + } + + return nil +} + +func (r *IPFSReader) openStream() (io.ReadCloser, error) { + if stgglb.IPFSPool != nil { + logger.Infof("try to use local IPFS to download file") + + reader, err := r.fromLocalIPFS() + if err == nil { + return reader, nil + } + + logger.Warnf("download from local IPFS failed, so try to download from node %v, err: %s", r.node.Name, err.Error()) + } + + return r.fromNode() +} + +func (r *IPFSReader) fromNode() (io.ReadCloser, error) { + planBld := plans.NewPlanBuilder() + fileStr := planBld.AtAgent(r.node).IPFSRead(r.fileHash, ipfs.ReadOption{ + Offset: r.offset, + Length: -1, + }).ToExecutor() + + plan, err := planBld.Build() + if err != nil { + return nil, fmt.Errorf("building plan: %w", err) + } + + waiter, err := plans.Execute(*plan) + if err != nil { + return nil, fmt.Errorf("execute plan: %w", err) + } + go func() { + waiter.Wait() + }() + + return waiter.ReadStream(fileStr) +} + +func (r *IPFSReader) fromLocalIPFS() (io.ReadCloser, error) { + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new ipfs client: %w", err) + } + + reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{ + Offset: r.offset, + Length: -1, + }) + if err != nil { + return nil, fmt.Errorf("read ipfs file failed, err: %w", err) + } + + reader = io2.AfterReadClosed(reader, func(io.ReadCloser) { + ipfsCli.Close() + }) + return reader, nil +} diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/downloader/iterator.go similarity index 52% rename from common/pkgs/iterator/download_object_iterator.go rename to common/pkgs/downloader/iterator.go index cd58dc4..922eeef 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -1,4 +1,4 @@ -package iterator +package downloader import ( "fmt" @@ -9,26 +9,23 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" - "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sort2" + "gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" - stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) -type DownloadingObjectIterator = Iterator[*IterDownloadingObject] - -type IterDownloadingObject struct { - Object cdssdk.Object - File io.ReadCloser -} +var errNoDirectReadBlock = fmt.Errorf("no direct read block") type DownloadNodeInfo struct { Node cdssdk.Node @@ -43,23 +40,23 @@ type DownloadContext struct { type DownloadObjectIterator struct { OnClosing func() - objectDetails []stgmodels.ObjectDetail - currentIndex int - inited bool + downloader *Downloader + reqs []downloadReqeust2 + currentIndex int + inited bool - downloadCtx *DownloadContext - coorCli *coormq.Client - allNodes map[cdssdk.NodeID]cdssdk.Node + coorCli *coormq.Client + allNodes map[cdssdk.NodeID]cdssdk.Node } -func NewDownloadObjectIterator(objectDetails []stgmodels.ObjectDetail, downloadCtx *DownloadContext) *DownloadObjectIterator { +func NewDownloadObjectIterator(downloader *Downloader, downloadObjs []downloadReqeust2) *DownloadObjectIterator { return &DownloadObjectIterator{ - objectDetails: objectDetails, - downloadCtx: downloadCtx, + downloader: downloader, + reqs: downloadObjs, } } -func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) { +func (i *DownloadObjectIterator) MoveNext() (*Downloading, error) { if !i.inited { if err := i.init(); err != nil { return nil, err @@ -68,8 +65,8 @@ func (i *DownloadObjectIterator) MoveNext() (*IterDownloadingObject, error) { i.inited = true } - if i.currentIndex >= len(i.objectDetails) { - return nil, ErrNoMoreItem + if i.currentIndex >= len(i.reqs) { + return nil, iterator.ErrNoMoreItem } item, err := i.doMove() @@ -85,12 +82,12 @@ func (i *DownloadObjectIterator) init() error { i.coorCli = coorCli allNodeIDs := make(map[cdssdk.NodeID]bool) - for _, obj := range i.objectDetails { - for _, p := range obj.PinnedAt { + for _, obj := range i.reqs { + for _, p := range obj.Detail.PinnedAt { allNodeIDs[p] = true } - for _, b := range obj.Blocks { + for _, b := range obj.Detail.Blocks { allNodeIDs[b.NodeID] = true } } @@ -108,45 +105,55 @@ func (i *DownloadObjectIterator) init() error { return nil } -func (iter *DownloadObjectIterator) doMove() (*IterDownloadingObject, error) { - obj := iter.objectDetails[iter.currentIndex] +func (iter *DownloadObjectIterator) doMove() (*Downloading, error) { + req := iter.reqs[iter.currentIndex] + if req.Detail == nil { + return &Downloading{ + Object: nil, + File: nil, + Request: req.Raw, + }, nil + } - switch red := obj.Object.Redundancy.(type) { + switch red := req.Detail.Object.Redundancy.(type) { case *cdssdk.NoneRedundancy: - reader, err := iter.downloadNoneOrRepObject(obj) + reader, err := iter.downloadNoneOrRepObject(req) if err != nil { return nil, fmt.Errorf("downloading object: %w", err) } - return &IterDownloadingObject{ - Object: obj.Object, - File: reader, + return &Downloading{ + Object: &req.Detail.Object, + File: reader, + Request: req.Raw, }, nil case *cdssdk.RepRedundancy: - reader, err := iter.downloadNoneOrRepObject(obj) + reader, err := iter.downloadNoneOrRepObject(req) if err != nil { return nil, fmt.Errorf("downloading rep object: %w", err) } - return &IterDownloadingObject{ - Object: obj.Object, - File: reader, + return &Downloading{ + Object: &req.Detail.Object, + File: reader, + Request: req.Raw, }, nil case *cdssdk.ECRedundancy: - reader, err := iter.downloadECObject(obj, red) + reader, err := iter.downloadECObject(req, red) if err != nil { return nil, fmt.Errorf("downloading ec object: %w", err) } - return &IterDownloadingObject{ - Object: obj.Object, - File: reader, + return &Downloading{ + Object: &req.Detail.Object, + File: reader, + Request: req.Raw, }, nil } - return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(obj.Object.Redundancy)) + return nil, fmt.Errorf("unsupported redundancy type: %v", reflect.TypeOf(req.Detail.Object.Redundancy)) } func (i *DownloadObjectIterator) Close() { @@ -155,15 +162,20 @@ func (i *DownloadObjectIterator) Close() { } } -func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.ObjectDetail) (io.ReadCloser, error) { +func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2) (io.ReadCloser, error) { allNodes, err := iter.sortDownloadNodes(obj) if err != nil { return nil, err } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) osc, node := iter.getMinReadingObjectSolution(allNodes, 1) if bsc < osc { - return downloadFile(iter.downloadCtx, blocks[0].Node, blocks[0].Block.FileHash) + + return NewIPFSReaderWithRange(blocks[0].Node, blocks[0].Block.FileHash, ipfs.ReadOption{ + Offset: obj.Raw.Offset, + Length: obj.Raw.Length, + }), nil } // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 @@ -171,48 +183,118 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj stgmodels.Object return nil, fmt.Errorf("no node has this object") } - return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash) + return NewIPFSReaderWithRange(*node, obj.Detail.Object.FileHash, ipfs.ReadOption{ + Offset: obj.Raw.Offset, + Length: obj.Raw.Length, + }), nil } -func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { - allNodes, err := iter.sortDownloadNodes(obj) +func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { + allNodes, err := iter.sortDownloadNodes(req) if err != nil { return nil, err } + bsc, blocks := iter.getMinReadingBlockSolution(allNodes, ecRed.K) osc, node := iter.getMinReadingObjectSolution(allNodes, ecRed.K) + if bsc < osc { - var fileStrs []io.ReadCloser + var fileStrs []*IPFSReader + for _, b := range blocks { + str := NewIPFSReader(b.Node, b.Block.FileHash) - rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) + fileStrs = append(fileStrs, str) + } + + rs, err := ec.NewRs(ecRed.K, ecRed.N) if err != nil { return nil, fmt.Errorf("new rs: %w", err) } - for i, b := range blocks { - str, err := downloadFile(iter.downloadCtx, b.Node, b.Block.FileHash) - if err != nil { - for i -= 1; i >= 0; i-- { - fileStrs[i].Close() + pr, pw := io.Pipe() + go func() { + defer func() { + for _, str := range fileStrs { + str.Close() } - return nil, fmt.Errorf("donwloading file: %w", err) + }() + + readPos := req.Raw.Offset + totalReadLen := req.Detail.Object.Size - req.Raw.Offset + if req.Raw.Length >= 0 { + totalReadLen = req.Raw.Length } - fileStrs = append(fileStrs, str) - } + for totalReadLen > 0 { + curStripPos := readPos / int64(ecRed.K) / int64(ecRed.ChunkSize) + curStripPosInBytes := curStripPos * int64(ecRed.K) * int64(ecRed.ChunkSize) + nextStripPosInBytes := (curStripPos + 1) * int64(ecRed.K) * int64(ecRed.ChunkSize) + curReadLen := math2.Min(totalReadLen, nextStripPosInBytes-readPos) + readRelativePos := readPos - curStripPosInBytes + cacheKey := ECStripKey{ + ObjectID: req.Detail.Object.ObjectID, + StripPosition: curStripPos, + } - fileReaders, filesCloser := myio.ToReaders(fileStrs) + cache, ok := iter.downloader.strips.Get(cacheKey) + if ok { + if cache.ObjectFileHash == req.Detail.Object.FileHash { + err := io2.WriteAll(pw, cache.Data[readRelativePos:readRelativePos+curReadLen]) + if err != nil { + pw.CloseWithError(err) + break + } + totalReadLen -= curReadLen + readPos += curReadLen + continue + } + + // 如果Object的Hash和Cache的Hash不一致,说明Cache是无效的,需要重新下载 + iter.downloader.strips.Remove(cacheKey) + } - var indexes []int - for _, b := range blocks { - indexes = append(indexes, b.Block.Index) - } + for _, str := range fileStrs { + _, err := str.Seek(curStripPosInBytes, io.SeekStart) + if err != nil { + pw.CloseWithError(err) + break + } + } - outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) - return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { - filesCloser() - outputsCloser() - }), nil + dataBuf := make([]byte, int64(ecRed.K*ecRed.ChunkSize)) + blockArrs := make([][]byte, ecRed.N) + for _, b := range blocks { + if b.Block.Index < ecRed.K { + blockArrs[b.Block.Index] = dataBuf[b.Block.Index*ecRed.ChunkSize : (b.Block.Index+1)*ecRed.ChunkSize] + } else { + blockArrs[b.Block.Index] = make([]byte, ecRed.ChunkSize) + } + } + + err := sync2.ParallelDo(blocks, func(b downloadBlock, idx int) error { + _, err := io.ReadFull(fileStrs[idx], blockArrs[b.Block.Index]) + return err + }) + if err != nil { + pw.CloseWithError(err) + break + } + + err = rs.ReconstructData(blockArrs) + if err != nil { + pw.CloseWithError(err) + break + } + + iter.downloader.strips.Add(cacheKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: req.Detail.Object.FileHash, + }) + // 下次循环就能从Cache中读取数据 + } + }() + + return pr, nil } // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 @@ -220,24 +302,27 @@ func (iter *DownloadObjectIterator) downloadECObject(obj stgmodels.ObjectDetail, return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(blocks)) } - return downloadFile(iter.downloadCtx, *node, obj.Object.FileHash) + return NewIPFSReaderWithRange(*node, req.Detail.Object.FileHash, ipfs.ReadOption{ + Offset: req.Raw.Offset, + Length: req.Raw.Length, + }), nil } -func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail) ([]*DownloadNodeInfo, error) { +func (iter *DownloadObjectIterator) sortDownloadNodes(req downloadReqeust2) ([]*DownloadNodeInfo, error) { var nodeIDs []cdssdk.NodeID - for _, id := range obj.PinnedAt { + for _, id := range req.Detail.PinnedAt { if !lo.Contains(nodeIDs, id) { nodeIDs = append(nodeIDs, id) } } - for _, b := range obj.Blocks { + for _, b := range req.Detail.Blocks { if !lo.Contains(nodeIDs, b.NodeID) { nodeIDs = append(nodeIDs, b.NodeID) } } downloadNodeMap := make(map[cdssdk.NodeID]*DownloadNodeInfo) - for _, id := range obj.PinnedAt { + for _, id := range req.Detail.PinnedAt { node, ok := downloadNodeMap[id] if !ok { mod := iter.allNodes[id] @@ -252,7 +337,7 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(obj stgmodels.ObjectDetail node.ObjectPinned = true } - for _, b := range obj.Blocks { + for _, b := range req.Detail.Blocks { node, ok := downloadNodeMap[b.NodeID] if !ok { mod := iter.allNodes[b.NodeID] @@ -326,62 +411,3 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 { return consts.NodeDistanceOther } - -func downloadFile(ctx *DownloadContext, node cdssdk.Node, fileHash string) (io.ReadCloser, error) { - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := node.ExternalIP - grpcPort := node.ExternalGRPCPort - if node.LocationID == stgglb.Local.LocationID { - nodeIP = node.LocalIP - grpcPort = node.LocalGRPCPort - - logger.Infof("client and node %d are at the same location, use local ip", node.NodeID) - } - - if stgglb.IPFSPool != nil { - logger.Infof("try to use local IPFS to download file") - - reader, err := downloadFromLocalIPFS(ctx, fileHash) - if err == nil { - return reader, nil - } - - logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) - } - - return downloadFromNode(ctx, node.NodeID, nodeIP, grpcPort, fileHash) -} - -func downloadFromNode(ctx *DownloadContext, nodeID cdssdk.NodeID, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { - agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) - if err != nil { - return nil, fmt.Errorf("new agent grpc client: %w", err) - } - - reader, err := agtCli.GetIPFSFile(fileHash) - if err != nil { - return nil, fmt.Errorf("getting ipfs file: %w", err) - } - - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - agtCli.Close() - }) - return reader, nil -} - -func downloadFromLocalIPFS(ctx *DownloadContext, fileHash string) (io.ReadCloser, error) { - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new ipfs client: %w", err) - } - - reader, err := ipfsCli.OpenRead(fileHash) - if err != nil { - return nil, fmt.Errorf("read ipfs file failed, err: %w", err) - } - - reader = myio.AfterReadClosed(reader, func(io.ReadCloser) { - ipfsCli.Close() - }) - return reader, nil -} diff --git a/common/pkgs/ec/rs.go b/common/pkgs/ec/rs.go index 5fc8091..aa72a07 100644 --- a/common/pkgs/ec/rs.go +++ b/common/pkgs/ec/rs.go @@ -1,38 +1,44 @@ package ec import ( - "fmt" - "os" - - "github.com/baohan10/reedsolomon" + "github.com/klauspost/reedsolomon" ) -type rs struct { - r *(reedsolomon.ReedSolomon) - ecN int - ecK int - ecP int +type Rs struct { + encoder reedsolomon.Encoder + ecN int + ecK int + ecP int } -func NewRsEnc(ecK int, ecN int) *rs { - enc := rs{ - ecN: ecN, - ecK: ecK, - ecP: ecN - ecK, +func NewRs(k int, n int) (*Rs, error) { + enc := Rs{ + ecN: n, + ecK: k, + ecP: n - k, } - enc.r = reedsolomon.GetReedSolomonIns(ecK, ecN) - return &enc -} -func (r *rs) Encode(all [][]byte) { - r.r.Encode(all) + encoder, err := reedsolomon.New(k, n-k) + enc.encoder = encoder + return &enc, err } -func (r *rs) Repair(all [][]byte) error { - return r.r.Reconstruct(all) +// 任意k个块恢复出所有原始的数据块。 +// blocks的长度必须为N,且至少有K个元素不为nil +func (r *Rs) ReconstructData(blocks [][]byte) error { + outIndexes := make([]int, r.ecK) + for i := 0; i < r.ecK; i++ { + outIndexes[i] = i + } + + return r.ReconstructAny(blocks, outIndexes) } -func checkErr(err error) { - if err != nil { - fmt.Fprintf(os.Stderr, "Error: %s", err.Error()) +// 重建指定的任意块,可以是数据块或校验块。 +// 在input上原地重建,因此input的长度必须为N。 +func (r *Rs) ReconstructAny(blocks [][]byte, outBlockIdxes []int) error { + required := make([]bool, len(blocks)) + for _, idx := range outBlockIdxes { + required[idx] = true } + return r.encoder.ReconstructAny(blocks, required) } diff --git a/common/pkgs/ec/rs_test.go b/common/pkgs/ec/rs_test.go index fb60f28..7c74ac2 100644 --- a/common/pkgs/ec/rs_test.go +++ b/common/pkgs/ec/rs_test.go @@ -11,7 +11,7 @@ import ( func Test_EncodeReconstruct(t *testing.T) { Convey("编码后使用校验块重建数据", t, func() { - rs, err := NewRs(2, 3, 5) + rs, err := NewStreamRs(2, 3, 5) So(err, ShouldBeNil) outputs := rs.EncodeAll([]io.Reader{ diff --git a/common/pkgs/ec/stream_rs.go b/common/pkgs/ec/stream_rs.go index df7e4b4..5b4ab15 100644 --- a/common/pkgs/ec/stream_rs.go +++ b/common/pkgs/ec/stream_rs.go @@ -4,10 +4,10 @@ import ( "io" "github.com/klauspost/reedsolomon" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" ) -type Rs struct { +type StreamRs struct { encoder reedsolomon.Encoder ecN int ecK int @@ -15,8 +15,8 @@ type Rs struct { chunkSize int } -func NewRs(k int, n int, chunkSize int) (*Rs, error) { - enc := Rs{ +func NewStreamRs(k int, n int, chunkSize int) (*StreamRs, error) { + enc := StreamRs{ ecN: n, ecK: k, ecP: n - k, @@ -28,7 +28,7 @@ func NewRs(k int, n int, chunkSize int) (*Rs, error) { } // 编码。仅输出校验块 -func (r *Rs) Encode(input []io.Reader) []io.ReadCloser { +func (r *StreamRs) Encode(input []io.Reader) []io.ReadCloser { outReaders := make([]io.ReadCloser, r.ecP) outWriters := make([]*io.PipeWriter, r.ecP) for i := 0; i < r.ecP; i++ { @@ -60,7 +60,7 @@ func (r *Rs) Encode(input []io.Reader) []io.ReadCloser { //输出到outWriter for i := range outWriters { - err := myio.WriteAll(outWriters[i], chunks[i+r.ecK]) + err := io2.WriteAll(outWriters[i], chunks[i+r.ecK]) if err != nil { closeErr = err break loop @@ -77,7 +77,7 @@ func (r *Rs) Encode(input []io.Reader) []io.ReadCloser { } // 编码。输出包含所有的数据块和校验块 -func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser { +func (r *StreamRs) EncodeAll(input []io.Reader) []io.ReadCloser { outReaders := make([]io.ReadCloser, r.ecN) outWriters := make([]*io.PipeWriter, r.ecN) for i := 0; i < r.ecN; i++ { @@ -109,7 +109,7 @@ func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser { //输出到outWriter for i := range outWriters { - err := myio.WriteAll(outWriters[i], chunks[i]) + err := io2.WriteAll(outWriters[i], chunks[i]) if err != nil { closeErr = err break loop @@ -126,7 +126,7 @@ func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser { } // 降级读,任意k个块恢复出所有原始的数据块。 -func (r *Rs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser { +func (r *StreamRs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser { outIndexes := make([]int, r.ecK) for i := 0; i < r.ecK; i++ { outIndexes[i] = i @@ -137,7 +137,7 @@ func (r *Rs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadClose // 修复,任意k个块恢复指定的数据块。 // 调用者应该保证input的每一个流长度相同,且均为chunkSize的整数倍 -func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser { +func (r *StreamRs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser { outReaders := make([]io.ReadCloser, len(outBlockIdx)) outWriters := make([]*io.PipeWriter, len(outBlockIdx)) for i := 0; i < len(outBlockIdx); i++ { @@ -181,7 +181,7 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx [] //输出到outWriter for i := range outBlockIdx { - err := myio.WriteAll(outWriters[i], chunks[outBlockIdx[i]]) + err := io2.WriteAll(outWriters[i], chunks[outBlockIdx[i]]) if err != nil { closeErr = err break loop @@ -204,7 +204,7 @@ func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx [] // 重建任意块,包括数据块和校验块。 // 当前的实现会把不需要的块都重建出来,所以应该避免使用这个函数。 -func (r *Rs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser { +func (r *StreamRs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser { outReaders := make([]io.ReadCloser, len(outBlockIdxes)) outWriters := make([]*io.PipeWriter, len(outBlockIdxes)) for i := 0; i < len(outBlockIdxes); i++ { @@ -250,7 +250,7 @@ func (r *Rs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes for i := range outBlockIdxes { outIndex := outBlockIdxes[i] - err := myio.WriteAll(outWriters[i], chunks[outIndex]) + err := io2.WriteAll(outWriters[i], chunks[outIndex]) if err != nil { closeErr = err break loop diff --git a/common/pkgs/ioswitch/ops/chunked_join.go b/common/pkgs/ioswitch/ops/chunked_join.go index 691af52..51fd472 100644 --- a/common/pkgs/ioswitch/ops/chunked_join.go +++ b/common/pkgs/ioswitch/ops/chunked_join.go @@ -5,7 +5,7 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -34,7 +34,7 @@ func (o *ChunkedJoin) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error fut := future.NewSetVoid() sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, - myio.AfterReadClosedOnce(myio.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { fut.SetVoid() }), ), diff --git a/common/pkgs/ioswitch/ops/chunked_split.go b/common/pkgs/ioswitch/ops/chunked_split.go index 0c8eb96..ae32d98 100644 --- a/common/pkgs/ioswitch/ops/chunked_split.go +++ b/common/pkgs/ioswitch/ops/chunked_split.go @@ -4,7 +4,7 @@ import ( "io" "sync" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -24,7 +24,7 @@ func (o *ChunkedSplit) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) erro defer str[0].Stream.Close() wg := sync.WaitGroup{} - outputs := myio.ChunkedSplit(str[0].Stream, o.ChunkSize, o.StreamCount, myio.ChunkedSplitOption{ + outputs := io2.ChunkedSplit(str[0].Stream, o.ChunkSize, o.StreamCount, io2.ChunkedSplitOption{ PaddingZeros: o.PaddingZeros, }) @@ -33,7 +33,7 @@ func (o *ChunkedSplit) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) erro sw.StreamReady(planID, ioswitch.NewStream( o.OutputIDs[i], - myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { wg.Done() }), )) diff --git a/common/pkgs/ioswitch/ops/clone.go b/common/pkgs/ioswitch/ops/clone.go index 670159a..97c9958 100644 --- a/common/pkgs/ioswitch/ops/clone.go +++ b/common/pkgs/ioswitch/ops/clone.go @@ -4,7 +4,7 @@ import ( "io" "sync" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -21,13 +21,13 @@ func (o *Clone) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { defer strs[0].Stream.Close() wg := sync.WaitGroup{} - cloned := myio.Clone(strs[0].Stream, len(o.OutputIDs)) + cloned := io2.Clone(strs[0].Stream, len(o.OutputIDs)) for i, s := range cloned { wg.Add(1) sw.StreamReady(planID, ioswitch.NewStream(o.OutputIDs[i], - myio.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { wg.Done() }), ), diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index cc7cbfc..43b6bdf 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -6,7 +6,7 @@ import ( "sync" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -20,7 +20,7 @@ type ECReconstructAny struct { } func (o *ECReconstructAny) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { - rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) if err != nil { return fmt.Errorf("new ec: %w", err) } @@ -45,7 +45,7 @@ func (o *ECReconstructAny) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) wg := sync.WaitGroup{} for i, id := range o.OutputIDs { wg.Add(1) - sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sw.StreamReady(planID, ioswitch.NewStream(id, io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { wg.Done() }))) } @@ -62,7 +62,7 @@ type ECReconstruct struct { } func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { - rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) if err != nil { return fmt.Errorf("new ec: %w", err) } @@ -87,7 +87,7 @@ func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) err wg := sync.WaitGroup{} for i, id := range o.OutputIDs { wg.Add(1) - sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { + sw.StreamReady(planID, ioswitch.NewStream(id, io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { wg.Done() }))) } diff --git a/common/pkgs/ioswitch/ops/file.go b/common/pkgs/ioswitch/ops/file.go index 8f9adcd..554f4a2 100644 --- a/common/pkgs/ioswitch/ops/file.go +++ b/common/pkgs/ioswitch/ops/file.go @@ -8,7 +8,7 @@ import ( "path" "gitlink.org.cn/cloudream/common/pkgs/future" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -56,7 +56,7 @@ func (o *FileRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } fut := future.NewSetVoid() - sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, myio.AfterReadClosed(file, func(closer io.ReadCloser) { + sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, io2.AfterReadClosed(file, func(closer io.ReadCloser) { fut.SetVoid() }))) diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go index b5d5cfe..ebfc422 100644 --- a/common/pkgs/ioswitch/ops/grpc.go +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -66,7 +66,7 @@ func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } fut := future.NewSetVoid() - str = myio.AfterReadClosedOnce(str, func(closer io.ReadCloser) { + str = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) { fut.SetVoid() }) diff --git a/common/pkgs/ioswitch/ops/ipfs.go b/common/pkgs/ioswitch/ops/ipfs.go index 7ead8d6..3b63ca9 100644 --- a/common/pkgs/ioswitch/ops/ipfs.go +++ b/common/pkgs/ioswitch/ops/ipfs.go @@ -6,8 +6,9 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -15,6 +16,7 @@ import ( type IPFSRead struct { Output ioswitch.StreamID `json:"output"` FileHash string `json:"fileHash"` + Option ipfs.ReadOption `json:"option"` } func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { @@ -30,13 +32,13 @@ func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } defer stgglb.IPFSPool.Release(ipfsCli) - file, err := ipfsCli.OpenRead(o.FileHash) + file, err := ipfsCli.OpenRead(o.FileHash, o.Option) if err != nil { return fmt.Errorf("reading ipfs: %w", err) } fut := future.NewSetVoid() - file = myio.AfterReadClosedOnce(file, func(closer io.ReadCloser) { + file = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { fut.SetVoid() }) diff --git a/common/pkgs/ioswitch/ops/join.go b/common/pkgs/ioswitch/ops/join.go index 5c1184a..0a7244e 100644 --- a/common/pkgs/ioswitch/ops/join.go +++ b/common/pkgs/ioswitch/ops/join.go @@ -5,7 +5,7 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -34,7 +34,7 @@ func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { fut := future.NewSetVoid() sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, - myio.AfterReadClosedOnce(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) { + io2.AfterReadClosedOnce(io2.Length(io2.Join(strReaders), o.Length), func(closer io.ReadCloser) { fut.SetVoid() }), ), diff --git a/common/pkgs/ioswitch/ops/length.go b/common/pkgs/ioswitch/ops/length.go index e1d12a2..fdeddea 100644 --- a/common/pkgs/ioswitch/ops/length.go +++ b/common/pkgs/ioswitch/ops/length.go @@ -5,7 +5,7 @@ import ( "io" "gitlink.org.cn/cloudream/common/pkgs/future" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -25,7 +25,7 @@ func (o *Length) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { fut := future.NewSetVoid() sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, - myio.AfterReadClosedOnce(myio.Length(strs[0].Stream, o.Length), func(closer io.ReadCloser) { + io2.AfterReadClosedOnce(io2.Length(strs[0].Stream, o.Length), func(closer io.ReadCloser) { fut.SetVoid() }), ), diff --git a/common/pkgs/ioswitch/plans/agent_plan.go b/common/pkgs/ioswitch/plans/agent_plan.go index 9c75a8a..f5eaabf 100644 --- a/common/pkgs/ioswitch/plans/agent_plan.go +++ b/common/pkgs/ioswitch/plans/agent_plan.go @@ -1,6 +1,7 @@ package plans import ( + "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" @@ -59,7 +60,15 @@ func (s *AgentStream) GRPCSend(node cdssdk.Node) *AgentStream { return agtStr } -func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream { +func (b *AgentPlanBuilder) IPFSRead(fileHash string, opts ...ipfs.ReadOption) *AgentStream { + opt := ipfs.ReadOption{ + Offset: 0, + Length: -1, + } + if len(opts) > 0 { + opt = opts[0] + } + agtStr := &AgentStream{ owner: b, info: b.owner.newStream(), @@ -68,6 +77,7 @@ func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream { b.ops = append(b.ops, &ops.IPFSRead{ Output: agtStr.info.ID, FileHash: fileHash, + Option: opt, }) return agtStr diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index cc23209..856e643 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -9,7 +9,7 @@ import ( "sync/atomic" "gitlink.org.cn/cloudream/common/pkgs/future" - myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -76,8 +76,15 @@ func Execute(plan ComposedPlan) (*Executor, error) { } func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error { - // TODO 根据地域选择IP - agtCli, err := stgglb.AgentRPCPool.Acquire(info.toNode.ExternalIP, info.toNode.ExternalGRPCPort) + // TODO 考虑不使用stgglb的Local + nodeIP := info.toNode.ExternalIP + grpcPort := info.toNode.ExternalGRPCPort + if info.toNode.LocationID == stgglb.Local.LocationID { + nodeIP = info.toNode.LocalIP + grpcPort = info.toNode.LocalGRPCPort + } + + agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) if err != nil { return fmt.Errorf("new agent rpc client: %w", err) } @@ -87,8 +94,15 @@ func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error } func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) { - // TODO 根据地域选择IP - agtCli, err := stgglb.AgentRPCPool.Acquire(info.fromNode.ExternalIP, info.fromNode.ExternalGRPCPort) + // TODO 考虑不使用stgglb的Local + nodeIP := info.fromNode.ExternalIP + grpcPort := info.fromNode.ExternalGRPCPort + if info.fromNode.LocationID == stgglb.Local.LocationID { + nodeIP = info.fromNode.LocalIP + grpcPort = info.fromNode.LocalGRPCPort + } + + agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort) if err != nil { return nil, fmt.Errorf("new agent rpc client: %w", err) } @@ -98,7 +112,7 @@ func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) { return nil, err } - return myio.AfterReadClosed(str, func(closer io.ReadCloser) { + return io2.AfterReadClosed(str, func(closer io.ReadCloser) { stgglb.AgentRPCPool.Release(agtCli) }), nil } diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 7b3c90f..a112365 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -64,12 +64,12 @@ type GetPackageObjectDetailsResp struct { Objects []stgmod.ObjectDetail `json:"objects"` } -func NewGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails { +func ReqGetPackageObjectDetails(packageID cdssdk.PackageID) *GetPackageObjectDetails { return &GetPackageObjectDetails{ PackageID: packageID, } } -func NewGetPackageObjectDetailsResp(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp { +func RespPackageObjectDetails(objects []stgmod.ObjectDetail) *GetPackageObjectDetailsResp { return &GetPackageObjectDetailsResp{ Objects: objects, } diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index a5dd496..3b6e66c 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -63,7 +63,7 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed") } - return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(details)) + return mq.ReplyOK(coormq.RespPackageObjectDetails(details)) } func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) { diff --git a/go.mod b/go.mod index aaeaa04..3c113bc 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,10 @@ go 1.20 replace gitlink.org.cn/cloudream/common v0.0.0 => ../common require ( - github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 github.com/gin-gonic/gin v1.9.1 github.com/go-sql-driver/mysql v1.7.1 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.3.1 + github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/jedib0t/go-pretty/v6 v6.4.7 github.com/jmoiron/sqlx v1.3.5 github.com/klauspost/reedsolomon v1.11.8 @@ -43,8 +43,6 @@ require ( github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/go-cmp v0.5.9 // indirect - github.com/google/licensecheck v0.3.1 // indirect - github.com/google/safehtml v0.0.3-0.20211026203422-d6f0e11a5516 // indirect github.com/gopherjs/gopherjs v1.17.2 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect @@ -52,7 +50,6 @@ require ( github.com/ipfs/boxo v0.12.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-ipfs-api v0.7.0 // indirect - github.com/jessevdk/go-flags v1.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/klauspost/cpuid/v2 v2.2.4 // indirect @@ -77,7 +74,6 @@ require ( github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect - github.com/ofabry/go-callvis v0.7.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect @@ -101,18 +97,12 @@ require ( golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.22.0 // indirect golang.org/x/exp v0.0.0-20230519143937-03e91628a987 // indirect - golang.org/x/image v0.15.0 // indirect - golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.24.0 // indirect - golang.org/x/pkgsite v0.0.0-20240405142909-b8abe0819782 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.20.0 // indirect - golang.org/x/tools/go/pointer v0.1.0-deprecated // indirect - google.golang.org/genproto v0.0.0-20230530153820-e85fd2cbaebc // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.7 // indirect rsc.io/markdown v0.0.0-20231214224604-88bb533a6020 // indirect diff --git a/go.sum b/go.sum index be88f69..246555e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,10 @@ github.com/antonfisher/nested-logrus-formatter v1.3.1 h1:NFJIr+pzwv5QLHTPyKz9UMEoHck02Q9L0FP13b/xSbQ= github.com/antonfisher/nested-logrus-formatter v1.3.1/go.mod h1:6WTfyWFkBc9+zyBaKIqRrg/KwMqBbodBjgbHjDz7zjA= +<<<<<<< HEAD github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7 h1:wcvD6enR///dFvb9cRodx5SGbPH4G4jPjw+aVIWkAKE= github.com/baohan10/reedsolomon v0.0.0-20230406042632-43574cac9fa7/go.mod h1:rAxMF6pVaFK/s6T4gGczvloccNbtwzuYaP2Y7W6flE8= +======= +>>>>>>> 770feaf2da11a3de00fa3ec57b16dc54ff31b288 github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= @@ -61,10 +64,13 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +<<<<<<< HEAD github.com/google/licensecheck v0.3.1 h1:QoxgoDkaeC4nFrtGN1jV7IPmDCHFNIVh54e5hSt6sPs= github.com/google/licensecheck v0.3.1/go.mod h1:ORkR35t/JjW+emNKtfJDII0zlciG9JgbT7SmsohlHmY= github.com/google/safehtml v0.0.3-0.20211026203422-d6f0e11a5516 h1:pSEdbeokt55L2hwtWo6A2k7u5SG08rmw0LhWEyrdWgk= github.com/google/safehtml v0.0.3-0.20211026203422-d6f0e11a5516/go.mod h1:L4KWwDsUJdECRAEpZoBn3O64bQaywRscowZjJAzjHnU= +======= +>>>>>>> 770feaf2da11a3de00fa3ec57b16dc54ff31b288 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= @@ -74,6 +80,8 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/golang-lru/v2 v2.0.5 h1:wW7h1TG88eUIJ2i69gaE3uNVtEPIagzhGvHgwfx2Vm4= +github.com/hashicorp/golang-lru/v2 v2.0.5/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM= github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/ipfs/boxo v0.12.0 h1:AXHg/1ONZdRQHQLgG5JHsSC3XoE4DjCAMgK+asZvUcQ= @@ -153,8 +161,11 @@ github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3d github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= +<<<<<<< HEAD github.com/ofabry/go-callvis v0.7.0 h1:kh8TYgER49uZDlMrYviHchBs+I4n/SgiZXv45CVkqiE= github.com/ofabry/go-callvis v0.7.0/go.mod h1:z/1SpfLX72BjG8mgjy77/VWK5xJ9YBytCBnQeQnRObQ= +======= +>>>>>>> 770feaf2da11a3de00fa3ec57b16dc54ff31b288 github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= @@ -239,6 +250,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +<<<<<<< HEAD golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/pkgsite v0.0.0-20240405142909-b8abe0819782 h1:LpBNDVFgFjnIZg+JzqKB2rSZCwV5o0NaYRZyAHBy8oI= @@ -248,6 +260,13 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +======= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +>>>>>>> 770feaf2da11a3de00fa3ec57b16dc54ff31b288 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -256,8 +275,13 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +<<<<<<< HEAD golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +======= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +>>>>>>> 770feaf2da11a3de00fa3ec57b16dc54ff31b288 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= diff --git a/log/agent.log b/log/agent.log index c61fe17..ccd00de 100644 --- a/log/agent.log +++ b/log/agent.log @@ -1341,3 +1341,16 @@ 2024-04-11 22:53:21 [DEBU] [:Collector] do testing 2024-04-12 01:42:33 [DEBU] [:Collector] do testing 2024-04-12 08:50:14 [DEBU] [:Collector] do testing +2024-04-12 08:53:21 [DEBU] [:Collector] do testing +2024-04-12 08:58:23 [DEBU] [:Collector] do testing +2024-04-12 09:03:23 [DEBU] [:Collector] do testing +2024-04-12 09:08:23 [DEBU] [:Collector] do testing +2024-04-12 09:13:23 [DEBU] [:Collector] do testing +2024-04-12 09:18:23 [DEBU] [:Collector] do testing +2024-04-12 09:23:23 [DEBU] [:Collector] do testing +2024-04-12 09:28:23 [DEBU] [:Collector] do testing +2024-04-12 09:33:23 [DEBU] [:Collector] do testing +2024-04-12 09:38:23 [DEBU] [:Collector] do testing +2024-04-12 09:43:23 [DEBU] [:Collector] do testing +2024-04-12 09:48:23 [DEBU] [:Collector] do testing +2024-04-12 09:53:23 [DEBU] [:Collector] do testing diff --git a/log/scanner.log b/log/scanner.log index 28f0fb6..5307e22 100644 --- a/log/scanner.log +++ b/log/scanner.log @@ -4948,3 +4948,76 @@ 2024-04-12 08:51:18 [DEBU] [Event:CheckPackage] end 2024-04-12 08:53:04 [DEBU] [TickEvent:BatchCleanPinned] begin 2024-04-12 08:53:04 [DEBU] [TickEvent:BatchCleanPinned] end +2024-04-12 08:54:40 [DEBU] [TickEvent:BatchCheckPackageRedundancy] begin +2024-04-12 08:54:40 [DEBU] [TickEvent:BatchCheckPackageRedundancy] end +2024-04-12 08:54:51 [DEBU] [TickEvent:BatchCheckAllStorage] begin +2024-04-12 08:54:51 [DEBU] [TickEvent:BatchCheckAllStorage] all storage checked, next time will start check at 0 +2024-04-12 08:54:51 [DEBU] [TickEvent:BatchCheckAllStorage] end +2024-04-12 08:54:51 [DEBU] [Event:AgentCheckStorage] begin with , StorageID: 1 +2024-04-12 08:54:51 [DEBU] [Event:AgentCheckStorage] end +2024-04-12 08:54:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] begin +2024-04-12 08:54:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] new check start, get all nodes +2024-04-12 08:54:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] end +2024-04-12 08:54:56 [DEBU] [Event:AgentCheckCache] begin with , NodeID: 1 +2024-04-12 08:54:56 [WARN] [Event:AgentCheckCache] [NodeID:1] checking ipfs: requesting: sending message: publishing data: Exception (504) Reason: "channel/connection is not open" +2024-04-12 08:54:56 [DEBU] [Event:AgentCheckCache] end, time: 0s +2024-04-12 08:55:26 [DEBU] [TickEvent:CheckAgentState] begin +2024-04-12 08:55:26 [DEBU] [TickEvent:CheckAgentState] end +2024-04-12 08:55:26 [DEBU] [Event:AgentCheckState] begin with , NodeID: 1 +2024-04-12 08:55:26 [WARN] [Event:AgentCheckState] [NodeID:1] getting state: requesting: sending message: publishing data: Exception (504) Reason: "channel/connection is not open" +2024-04-12 08:55:26 [DEBU] [Event:AgentCheckState] end +2024-04-12 08:55:35 [DEBU] [TickEvent:BatchCheckAllPackage] begin +2024-04-12 08:55:35 [DEBU] [TickEvent:BatchCheckAllPackage] all package checked, next time will start check at 0 +2024-04-12 08:55:35 [DEBU] [TickEvent:BatchCheckAllPackage] end +2024-04-12 08:55:35 [DEBU] [Event:CheckPackage] begin with , len(PackageIDs): 1 +2024-04-12 08:55:35 [DEBU] [Event:CheckPackage] end +2024-04-12 08:58:06 [DEBU] [TickEvent:BatchCleanPinned] begin +2024-04-12 08:58:06 [DEBU] [TickEvent:BatchCleanPinned] end +2024-04-12 08:59:40 [DEBU] [TickEvent:BatchCheckPackageRedundancy] begin +2024-04-12 08:59:40 [DEBU] [TickEvent:BatchCheckPackageRedundancy] end +2024-04-12 08:59:51 [DEBU] [TickEvent:BatchCheckAllStorage] begin +2024-04-12 08:59:51 [DEBU] [TickEvent:BatchCheckAllStorage] all storage checked, next time will start check at 0 +2024-04-12 08:59:51 [DEBU] [TickEvent:BatchCheckAllStorage] end +2024-04-12 08:59:51 [DEBU] [Event:AgentCheckStorage] begin with , StorageID: 1 +2024-04-12 08:59:51 [DEBU] [Event:AgentCheckStorage] end +2024-04-12 08:59:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] begin +2024-04-12 08:59:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] new check start, get all nodes +2024-04-12 08:59:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] end +2024-04-12 08:59:56 [DEBU] [Event:AgentCheckCache] begin with , NodeID: 1 +2024-04-12 08:59:56 [WARN] [Event:AgentCheckCache] [NodeID:1] checking ipfs: requesting: sending message: publishing data: Exception (504) Reason: "channel/connection is not open" +2024-04-12 08:59:56 [DEBU] [Event:AgentCheckCache] end, time: 0s +2024-04-12 09:00:26 [DEBU] [TickEvent:CheckAgentState] begin +2024-04-12 09:00:26 [DEBU] [TickEvent:CheckAgentState] end +2024-04-12 09:00:26 [DEBU] [Event:AgentCheckState] begin with , NodeID: 1 +2024-04-12 09:00:26 [WARN] [Event:AgentCheckState] [NodeID:1] getting state: requesting: sending message: publishing data: Exception (504) Reason: "channel/connection is not open" +2024-04-12 09:00:26 [DEBU] [Event:AgentCheckState] end +2024-04-12 09:00:35 [DEBU] [TickEvent:BatchCheckAllPackage] begin +2024-04-12 09:00:35 [DEBU] [TickEvent:BatchCheckAllPackage] all package checked, next time will start check at 0 +2024-04-12 09:00:35 [DEBU] [TickEvent:BatchCheckAllPackage] end +2024-04-12 09:00:35 [DEBU] [Event:CheckPackage] begin with , len(PackageIDs): 1 +2024-04-12 09:00:35 [DEBU] [Event:CheckPackage] end +2024-04-12 09:03:06 [DEBU] [TickEvent:BatchCleanPinned] begin +2024-04-12 09:03:06 [DEBU] [TickEvent:BatchCleanPinned] end +2024-04-12 09:04:40 [DEBU] [TickEvent:BatchCheckPackageRedundancy] begin +2024-04-12 09:04:40 [DEBU] [TickEvent:BatchCheckPackageRedundancy] end +2024-04-12 09:04:51 [DEBU] [TickEvent:BatchCheckAllStorage] begin +2024-04-12 09:04:51 [DEBU] [TickEvent:BatchCheckAllStorage] all storage checked, next time will start check at 0 +2024-04-12 09:04:51 [DEBU] [TickEvent:BatchCheckAllStorage] end +2024-04-12 09:04:51 [DEBU] [Event:AgentCheckStorage] begin with , StorageID: 1 +2024-04-12 09:04:51 [DEBU] [Event:AgentCheckStorage] end +2024-04-12 09:04:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] begin +2024-04-12 09:04:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] new check start, get all nodes +2024-04-12 09:04:56 [DEBU] [TickEvent:BatchAllAgentCheckCache] end +2024-04-12 09:04:56 [DEBU] [Event:AgentCheckCache] begin with , NodeID: 1 +2024-04-12 09:04:56 [WARN] [Event:AgentCheckCache] [NodeID:1] checking ipfs: requesting: sending message: publishing data: Exception (504) Reason: "channel/connection is not open" +2024-04-12 09:04:56 [DEBU] [Event:AgentCheckCache] end, time: 293.5µs +2024-04-12 09:05:26 [DEBU] [TickEvent:CheckAgentState] begin +2024-04-12 09:05:26 [DEBU] [TickEvent:CheckAgentState] end +2024-04-12 09:05:26 [DEBU] [Event:AgentCheckState] begin with , NodeID: 1 +2024-04-12 09:05:26 [WARN] [Event:AgentCheckState] [NodeID:1] getting state: requesting: sending message: publishing data: Exception (504) Reason: "channel/connection is not open" +2024-04-12 09:05:26 [DEBU] [Event:AgentCheckState] end +2024-04-12 09:05:35 [DEBU] [TickEvent:BatchCheckAllPackage] begin +2024-04-12 09:05:35 [DEBU] [TickEvent:BatchCheckAllPackage] all package checked, next time will start check at 0 +2024-04-12 09:05:35 [DEBU] [TickEvent:BatchCheckAllPackage] end +2024-04-12 09:05:35 [DEBU] [Event:CheckPackage] begin with , len(PackageIDs): 1 +2024-04-12 09:05:35 [DEBU] [Event:CheckPackage] end diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 880aa26..ece0574 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -64,7 +64,7 @@ func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { } defer stgglb.CoordinatorMQPool.Release(coorCli) - getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID)) + getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID)) if err != nil { log.Warnf("getting package objects: %s", err.Error()) return diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 9b30fc0..c992e67 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" - mymath "gitlink.org.cn/cloudream/common/utils/math" + "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/sort2" "gitlink.org.cn/cloudream/storage/common/consts" stgglb "gitlink.org.cn/cloudream/storage/common/globals" @@ -58,7 +58,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { } defer stgglb.CoordinatorMQPool.Release(coorCli) - getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID)) + getObjs, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.PackageID)) if err != nil { log.Warnf("getting package objects: %s", err.Error()) return @@ -674,7 +674,7 @@ func (t *CleanPinned) calcMinAccessCost(state *annealingState) float64 { // 下面的if会在拿到k个块之后跳出循环,所以or多了块也没关系 gotBlocks.Or(tarNodeMp) // 但是算读取块的消耗时,不能多算,最多算读了k个块的消耗 - willGetBlocks := mymath.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth) + willGetBlocks := math2.Min(gotBlocks.Weight()-curWeigth, state.object.minBlockCnt-curWeigth) thisCost += float64(willGetBlocks) * float64(tar.Distance) if gotBlocks.Weight() >= state.object.minBlockCnt {