From 6d4f4a9ac41f244c8682b804eef378daba3380d1 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 12 Dec 2023 09:46:45 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=A3=80=E6=9F=A5=E5=86=97?= =?UTF-8?q?=E4=BD=99=E6=96=B9=E5=BC=8F=E7=9A=84=E5=AE=9A=E6=97=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/scanner.go | 2 + client/internal/config/config.go | 13 +- common/assets/confs/client.config.json | 4 +- common/assets/confs/scanner.config.json | 2 +- common/pkgs/cmd/create_package.go | 4 +- common/pkgs/db/object.go | 36 +- common/pkgs/db/storage_package_log.go | 6 + common/pkgs/ioswitch/ops/ec.go | 8 +- common/pkgs/ioswitch/ops/length.go | 40 ++ common/pkgs/ioswitch/plans/agent_plan.go | 21 +- .../pkgs/iterator/download_object_iterator.go | 15 +- common/pkgs/mq/coordinator/node.go | 2 +- common/pkgs/mq/coordinator/object.go | 30 + common/pkgs/mq/coordinator/package.go | 16 +- common/pkgs/mq/coordinator/storage.go | 35 + .../scanner/event/check_package_redundancy.go | 18 + coordinator/internal/services/node.go | 23 +- coordinator/internal/services/object.go | 11 + coordinator/internal/services/package.go | 37 +- scanner/internal/config/config.go | 4 +- .../event/check_package_redundancy.go | 629 ++++++++++++++++++ scanner/internal/event/event.go | 6 +- scanner/internal/event/event_test.go | 76 +++ .../batch_check_package_redudancy.go | 48 ++ scanner/main.go | 2 + 25 files changed, 1044 insertions(+), 44 deletions(-) create mode 100644 common/pkgs/ioswitch/ops/length.go create mode 100644 common/pkgs/mq/scanner/event/check_package_redundancy.go create mode 100644 scanner/internal/event/check_package_redundancy.go create mode 100644 scanner/internal/event/event_test.go create mode 100644 scanner/internal/tickevent/batch_check_package_redudancy.go diff --git a/client/internal/cmdline/scanner.go b/client/internal/cmdline/scanner.go index 18fc571..2c10629 100644 --- a/client/internal/cmdline/scanner.go +++ b/client/internal/cmdline/scanner.go @@ -37,5 +37,7 @@ func init() { parseScannerEventCmdTrie.MustAdd(scevt.NewCheckRepCount, myreflect.TypeNameOf[scevt.CheckRepCount]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewCheckPackageRedundancy, myreflect.TypeNameOf[scevt.CheckPackageRedundancy]()) + commands.MustAdd(ScannerPostEvent, "scanner", "event") } diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 107f7c1..c08d366 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -11,13 +11,12 @@ import ( ) type Config struct { - Local stgmodels.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - MaxRepCount int `json:"maxRepCount"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 81a9ea6..10e4265 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -1,12 +1,12 @@ { "local": { "localIP": "127.0.0.1", - "externalIP": "127.0.0.1" + "externalIP": "127.0.0.1", + "locationID": 1 }, "agentGRPC": { "port": 5010 }, - "maxRepCount": 10, "logger": { "output": "stdout", "level": "debug" diff --git a/common/assets/confs/scanner.config.json b/common/assets/confs/scanner.config.json index be1b154..4dbd650 100644 --- a/common/assets/confs/scanner.config.json +++ b/common/assets/confs/scanner.config.json @@ -1,5 +1,5 @@ { - "minAvailableRepProportion": 0.8, + "ecFileSizeThreshold": 104857600, "nodeUnavailableSeconds": 300, "logger": { "output": "file", diff --git a/common/pkgs/cmd/create_package.go b/common/pkgs/cmd/create_package.go index 78af19c..013b055 100644 --- a/common/pkgs/cmd/create_package.go +++ b/common/pkgs/cmd/create_package.go @@ -163,7 +163,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo var uploadRets []ObjectUploadResult //上传文件夹 - var adds []coormq.AddObjectInfo + var adds []coormq.AddObjectEntry for { objInfo, err := objectIter.MoveNext() if err == iterator.ErrNoMoreItem { @@ -190,7 +190,7 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo return fmt.Errorf("uploading object: %w", err) } - adds = append(adds, coormq.NewAddObjectInfo(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) + adds = append(adds, coormq.NewAddObjectEntry(objInfo.Path, objInfo.Size, fileHash, uploadNode.Node.NodeID)) return nil }() if err != nil { diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 6020ceb..14d57d2 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -42,8 +42,8 @@ func (db *ObjectDB) Create(ctx SQLContext, packageID cdssdk.PackageID, path stri // 创建或者更新记录,返回值true代表是创建,false代表是更新 func (db *ObjectDB) CreateOrUpdate(ctx SQLContext, packageID cdssdk.PackageID, path string, size int64, fileHash string) (cdssdk.ObjectID, bool, error) { - // 首次上传Object时,默认使用Rep模式,即使是在更新一个已有的Object也是如此 - defRed := cdssdk.NewRepRedundancy() + // 首次上传Object时,默认不启用冗余,即使是在更新一个已有的Object也是如此 + defRed := cdssdk.NewNoneRedundancy() sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy) values(?,?,?,?,?) on duplicate key update Size = ?, FileHash = ?, Redundancy = ?" @@ -94,7 +94,7 @@ func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ( return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err } -func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectInfo) ([]cdssdk.ObjectID, error) { +func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { objIDs := make([]cdssdk.ObjectID, 0, len(objs)) for _, obj := range objs { // 创建对象的记录 @@ -128,6 +128,36 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, objs [] return objIDs, nil } +func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { + for _, obj := range objs { + _, err := ctx.Exec("update Object set Redundancy = ? where ObjectID = ?", obj.Redundancy, obj.ObjectID) + if err != nil { + return fmt.Errorf("updating object: %w", err) + } + + // 删除原本所有的编码块记录,重新添加 + if err = db.ObjectBlock().DeleteObjectAll(ctx, obj.ObjectID); err != nil { + return fmt.Errorf("deleting all object block: %w", err) + } + + for _, block := range obj.Blocks { + // 首次上传默认使用不分块的rep模式 + err = db.ObjectBlock().Create(ctx, obj.ObjectID, block.Index, block.NodeID, block.FileHash) + if err != nil { + return fmt.Errorf("creating object block: %w", err) + } + + // 创建缓存记录 + err = db.Cache().CreatePinned(ctx, block.FileHash, block.NodeID, 0) + if err != nil { + return fmt.Errorf("creating cache: %w", err) + } + } + } + + return nil +} + func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { _, err := ctx.Exec("delete from Object where ObjectID in (?)", ids) return err diff --git a/common/pkgs/db/storage_package_log.go b/common/pkgs/db/storage_package_log.go index 6a1a166..b882f30 100644 --- a/common/pkgs/db/storage_package_log.go +++ b/common/pkgs/db/storage_package_log.go @@ -22,6 +22,12 @@ func (*StoragePackageLogDB) Get(ctx SQLContext, storageID cdssdk.StorageID, pack return ret, err } +func (*StoragePackageLogDB) GetByPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]model.StoragePackageLog, error) { + var ret []model.StoragePackageLog + err := sqlx.Select(ctx, &ret, "select * from StoragePackageLog where PackageID = ?", packageID) + return ret, err +} + func (*StoragePackageLogDB) Create(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, createTime time.Time) error { _, err := ctx.Exec("insert into StoragePackageLog values(?,?,?,?)", storageID, packageID, userID, createTime) return err diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index 613bd2b..cc7cbfc 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -11,7 +11,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) -type ECCompute struct { +type ECReconstructAny struct { EC cdssdk.ECRedundancy `json:"ec"` InputIDs []ioswitch.StreamID `json:"inputIDs"` OutputIDs []ioswitch.StreamID `json:"outputIDs"` @@ -19,7 +19,7 @@ type ECCompute struct { OutputBlockIndexes []int `json:"outputBlockIndexes"` } -func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { +func (o *ECReconstructAny) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) if err != nil { return fmt.Errorf("new ec: %w", err) @@ -40,7 +40,7 @@ func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { inputs = append(inputs, s.Stream) } - outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) + outputs := rs.ReconstructAny(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) wg := sync.WaitGroup{} for i, id := range o.OutputIDs { @@ -97,6 +97,6 @@ func (o *ECReconstruct) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) err } func init() { - OpUnion.AddT((*ECCompute)(nil)) + OpUnion.AddT((*ECReconstructAny)(nil)) OpUnion.AddT((*ECReconstruct)(nil)) } diff --git a/common/pkgs/ioswitch/ops/length.go b/common/pkgs/ioswitch/ops/length.go new file mode 100644 index 0000000..e1d12a2 --- /dev/null +++ b/common/pkgs/ioswitch/ops/length.go @@ -0,0 +1,40 @@ +package ops + +import ( + "context" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type Length struct { + InputID ioswitch.StreamID `json:"inputID"` + OutputID ioswitch.StreamID `json:"outputID"` + Length int64 `json:"length"` +} + +func (o *Length) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + strs, err := sw.WaitStreams(planID, o.InputID) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + fut := future.NewSetVoid() + sw.StreamReady(planID, + ioswitch.NewStream(o.OutputID, + myio.AfterReadClosedOnce(myio.Length(strs[0].Stream, o.Length), func(closer io.ReadCloser) { + fut.SetVoid() + }), + ), + ) + + fut.Wait(context.TODO()) + return nil +} + +func init() { + OpUnion.AddT((*Length)(nil)) +} diff --git a/common/pkgs/ioswitch/plans/agent_plan.go b/common/pkgs/ioswitch/plans/agent_plan.go index 8bdcf0e..f45f2e3 100644 --- a/common/pkgs/ioswitch/plans/agent_plan.go +++ b/common/pkgs/ioswitch/plans/agent_plan.go @@ -102,7 +102,7 @@ func (b *AgentStream) FileWrite(filePath string) { }) } -func (b *AgentPlanBuilder) ECCompute(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { +func (b *AgentPlanBuilder) ECReconstructAny(ec cdssdk.ECRedundancy, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { mstr := &MultiStream{} var inputStrIDs []ioswitch.StreamID @@ -111,7 +111,7 @@ func (b *AgentPlanBuilder) ECCompute(ec cdssdk.ECRedundancy, inBlockIndexes []in } var outputStrIDs []ioswitch.StreamID - for i := 0; i < ec.N-ec.K; i++ { + for i := 0; i < len(outBlockIndexes); i++ { info := b.owner.newStream() mstr.Streams = append(mstr.Streams, &AgentStream{ owner: b, @@ -120,7 +120,7 @@ func (b *AgentPlanBuilder) ECCompute(ec cdssdk.ECRedundancy, inBlockIndexes []in outputStrIDs = append(outputStrIDs, info.ID) } - b.ops = append(b.ops, &ops.ECCompute{ + b.ops = append(b.ops, &ops.ECReconstructAny{ EC: ec, InputIDs: inputStrIDs, OutputIDs: outputStrIDs, @@ -183,6 +183,21 @@ func (b *AgentStream) ChunkedSplit(chunkSize int, streamCount int, paddingZeros return mstr } +func (s *AgentStream) Length(length int64) *AgentStream { + agtStr := &AgentStream{ + owner: s.owner, + info: s.owner.owner.newStream(), + } + + s.owner.ops = append(s.owner.ops, &ops.Length{ + InputID: s.info.ID, + OutputID: agtStr.info.ID, + Length: length, + }) + + return agtStr +} + func (s *AgentStream) ToExecutor() *ToExecutorStream { return &ToExecutorStream{ info: s.info, diff --git a/common/pkgs/iterator/download_object_iterator.go b/common/pkgs/iterator/download_object_iterator.go index 7683bd8..445504f 100644 --- a/common/pkgs/iterator/download_object_iterator.go +++ b/common/pkgs/iterator/download_object_iterator.go @@ -72,8 +72,19 @@ func (iter *DownloadObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloa obj := iter.objectDetails[iter.currentIndex] switch red := obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj) + if err != nil { + return nil, fmt.Errorf("downloading object: %w", err) + } + + return &IterDownloadingObject{ + Object: obj.Object, + File: reader, + }, nil + case *cdssdk.RepRedundancy: - reader, err := iter.downloadRepObject(coorCli, iter.downloadCtx, obj, red) + reader, err := iter.downloadNoneOrRepObject(coorCli, iter.downloadCtx, obj) if err != nil { return nil, fmt.Errorf("downloading rep object: %w", err) } @@ -116,7 +127,7 @@ func (i *DownloadObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) return entries[rand.Intn(len(entries))] } -func (iter *DownloadObjectIterator) downloadRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail, repRed *cdssdk.RepRedundancy) (io.ReadCloser, error) { +func (iter *DownloadObjectIterator) downloadNoneOrRepObject(coorCli *coormq.Client, ctx *DownloadContext, obj stgmodels.ObjectDetail) (io.ReadCloser, error) { //采取直接读,优先选内网节点 var chosenNodes []DownloadNodeInfo for i := range obj.Blocks { diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/node.go index dd3b042..7285f83 100644 --- a/common/pkgs/mq/coordinator/node.go +++ b/common/pkgs/mq/coordinator/node.go @@ -38,7 +38,7 @@ func (client *Client) GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, error) return mq.Request(Service.GetUserNodes, client.rabbitCli, msg) } -// 获取指定节点的信息 +// 获取指定节点的信息。如果NodeIDs为nil,则返回所有Node var _ = Register(Service.GetNodes) type GetNodes struct { diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 780e2af..543d4f4 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -12,6 +12,8 @@ type ObjectService interface { GetPackageObjects(msg *GetPackageObjects) (*GetPackageObjectsResp, *mq.CodeMessage) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, *mq.CodeMessage) + + ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, *mq.CodeMessage) } // 查询Package中的所有Object,返回的Objects会按照ObjectID升序 @@ -67,3 +69,31 @@ func NewGetPackageObjectDetailsResp(objects []stgmod.ObjectDetail) *GetPackageOb func (client *Client) GetPackageObjectDetails(msg *GetPackageObjectDetails) (*GetPackageObjectDetailsResp, error) { return mq.Request(Service.GetPackageObjectDetails, client.rabbitCli, msg) } + +// 更新Object的冗余方式 +var _ = Register(Service.ChangeObjectRedundancy) + +type ChangeObjectRedundancy struct { + mq.MessageBodyBase + Entries []ChangeObjectRedundancyEntry `json:"entries"` +} +type ChangeObjectRedundancyResp struct { + mq.MessageBodyBase +} +type ChangeObjectRedundancyEntry struct { + ObjectID cdssdk.ObjectID `json:"objectID"` + Redundancy cdssdk.Redundancy `json:"redundancy"` + Blocks []stgmod.ObjectBlock `json:"blocks"` +} + +func ReqChangeObjectRedundancy(entries []ChangeObjectRedundancyEntry) *ChangeObjectRedundancy { + return &ChangeObjectRedundancy{ + Entries: entries, + } +} +func RespChangeObjectRedundancy() *ChangeObjectRedundancyResp { + return &ChangeObjectRedundancyResp{} +} +func (client *Client) ChangeObjectRedundancy(msg *ChangeObjectRedundancy) (*ChangeObjectRedundancyResp, error) { + return mq.Request(Service.ChangeObjectRedundancy, client.rabbitCli, msg) +} diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index a662edb..52c51f6 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -12,7 +12,7 @@ type PackageService interface { CreatePackage(msg *CreatePackage) (*CreatePackageResp, *mq.CodeMessage) - UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage) + UpdatePackage(msg *UpdatePackage) (*UpdatePackageResp, *mq.CodeMessage) DeletePackage(msg *DeletePackage) (*DeletePackageResp, *mq.CodeMessage) @@ -80,25 +80,25 @@ func (client *Client) CreatePackage(msg *CreatePackage) (*CreatePackageResp, err } // 更新EC备份模式的Package -var _ = Register(Service.UpdateECPackage) +var _ = Register(Service.UpdatePackage) type UpdatePackage struct { mq.MessageBodyBase PackageID cdssdk.PackageID `json:"packageID"` - Adds []AddObjectInfo `json:"objects"` + Adds []AddObjectEntry `json:"adds"` Deletes []cdssdk.ObjectID `json:"deletes"` } type UpdatePackageResp struct { mq.MessageBodyBase } -type AddObjectInfo struct { +type AddObjectEntry struct { Path string `json:"path"` Size int64 `json:"size,string"` FileHash string `json:"fileHash"` NodeID cdssdk.NodeID `json:"nodeID"` } -func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes []cdssdk.ObjectID) *UpdatePackage { +func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { return &UpdatePackage{ PackageID: packageID, Adds: adds, @@ -108,8 +108,8 @@ func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectInfo, deletes func NewUpdatePackageResp() *UpdatePackageResp { return &UpdatePackageResp{} } -func NewAddObjectInfo(path string, size int64, fileHash string, nodeIDs cdssdk.NodeID) AddObjectInfo { - return AddObjectInfo{ +func NewAddObjectEntry(path string, size int64, fileHash string, nodeIDs cdssdk.NodeID) AddObjectEntry { + return AddObjectEntry{ Path: path, Size: size, FileHash: fileHash, @@ -117,7 +117,7 @@ func NewAddObjectInfo(path string, size int64, fileHash string, nodeIDs cdssdk.N } } func (client *Client) UpdateECPackage(msg *UpdatePackage) (*UpdatePackageResp, error) { - return mq.Request(Service.UpdateECPackage, client.rabbitCli, msg) + return mq.Request(Service.UpdatePackage, client.rabbitCli, msg) } // 删除对象 diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index afdc145..a92169a 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -1,6 +1,8 @@ package coordinator import ( + "time" + "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -10,6 +12,8 @@ type StorageService interface { GetStorageInfo(msg *GetStorageInfo) (*GetStorageInfoResp, *mq.CodeMessage) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage) + + GetPackageLoadLogDetails(msg *GetPackageLoadLogDetails) (*GetPackageLoadLogDetailsResp, *mq.CodeMessage) } // 获取Storage信息 @@ -72,3 +76,34 @@ func NewStoragePackageLoadedResp() *StoragePackageLoadedResp { func (client *Client) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, error) { return mq.Request(Service.StoragePackageLoaded, client.rabbitCli, msg) } + +// 查询Package的导入记录 +var _ = Register(Service.GetPackageLoadLogDetails) + +type GetPackageLoadLogDetails struct { + mq.MessageBodyBase + PackageID cdssdk.PackageID `json:"packageID"` +} +type GetPackageLoadLogDetailsResp struct { + mq.MessageBodyBase + Logs []PackageLoadLogDetail `json:"logs"` +} +type PackageLoadLogDetail struct { + Storage model.Storage `json:"storage"` + UserID cdssdk.UserID `json:"userID"` + CreateTime time.Time `json:"createTime"` +} + +func ReqGetPackageLoadLogDetails(packageID cdssdk.PackageID) *GetPackageLoadLogDetails { + return &GetPackageLoadLogDetails{ + PackageID: packageID, + } +} +func RespGetPackageLoadLogDetails(logs []PackageLoadLogDetail) *GetPackageLoadLogDetailsResp { + return &GetPackageLoadLogDetailsResp{ + Logs: logs, + } +} +func (client *Client) GetPackageLoadLogDetails(msg *GetPackageLoadLogDetails) (*GetPackageLoadLogDetailsResp, error) { + return mq.Request(Service.GetPackageLoadLogDetails, client.rabbitCli, msg) +} diff --git a/common/pkgs/mq/scanner/event/check_package_redundancy.go b/common/pkgs/mq/scanner/event/check_package_redundancy.go new file mode 100644 index 0000000..1c7687a --- /dev/null +++ b/common/pkgs/mq/scanner/event/check_package_redundancy.go @@ -0,0 +1,18 @@ +package event + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type CheckPackageRedundancy struct { + EventBase + PackageID cdssdk.PackageID `json:"packageIDs"` +} + +func NewCheckPackageRedundancy(packageID cdssdk.PackageID) *CheckPackageRedundancy { + return &CheckPackageRedundancy{ + PackageID: packageID, + } +} + +func init() { + Register[*CheckPackageRedundancy]() +} diff --git a/coordinator/internal/services/node.go b/coordinator/internal/services/node.go index 9d7eaed..3028d69 100644 --- a/coordinator/internal/services/node.go +++ b/coordinator/internal/services/node.go @@ -21,15 +21,26 @@ func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodes func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.CodeMessage) { var nodes []model.Node - for _, id := range msg.NodeIDs { - node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) + + if msg.NodeIDs == nil { + var err error + nodes, err = svc.db.Node().GetAllNodes(svc.db.SQLCtx()) if err != nil { - logger.WithField("NodeID", id). - Warnf("query node failed, err: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "query node failed") + logger.Warnf("getting all nodes: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get all node failed") } - nodes = append(nodes, node) + } else { + for _, id := range msg.NodeIDs { + node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) + if err != nil { + logger.WithField("NodeID", id). + Warnf("query node failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "query node failed") + } + + nodes = append(nodes, node) + } } return mq.ReplyOK(coormq.NewGetNodesResp(nodes)) diff --git a/coordinator/internal/services/object.go b/coordinator/internal/services/object.go index f3f9674..2a73045 100644 --- a/coordinator/internal/services/object.go +++ b/coordinator/internal/services/object.go @@ -31,3 +31,14 @@ func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) return mq.ReplyOK(coormq.NewGetPackageObjectDetailsResp(data)) } + +func (svc *Service) ChangeObjectRedundancy(msg *coormq.ChangeObjectRedundancy) (*coormq.ChangeObjectRedundancyResp, *mq.CodeMessage) { + err := svc.db.Object().BatchUpdateRedundancy(svc.db.SQLCtx(), msg.Entries) + if err != nil { + logger.Warnf("batch updating redundancy: %s", err.Error()) + + return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed") + } + + return mq.ReplyOK(coormq.RespChangeObjectRedundancy()) +} diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index baa7b10..b761841 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" @@ -45,7 +46,7 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) } -func (svc *Service) UpdateECPackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { +func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) { _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) if err != nil { logger.WithField("PackageID", msg.PackageID). @@ -207,3 +208,37 @@ func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*c return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs)) } + +func (svc *Service) GetPackageLoadLogDetails(msg *coormq.GetPackageLoadLogDetails) (*coormq.GetPackageLoadLogDetailsResp, *mq.CodeMessage) { + var logs []coormq.PackageLoadLogDetail + rawLogs, err := svc.db.StoragePackageLog().GetByPackageID(svc.db.SQLCtx(), msg.PackageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("getting storage package log: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get storage package log failed") + } + + stgs := make(map[cdssdk.StorageID]model.Storage) + + for _, raw := range rawLogs { + stg, ok := stgs[raw.StorageID] + if !ok { + stg, err = svc.db.Storage().GetByID(svc.db.SQLCtx(), raw.StorageID) + if err != nil { + logger.WithField("PackageID", msg.PackageID). + Warnf("getting storage: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get storage failed") + } + + stgs[raw.StorageID] = stg + } + + logs = append(logs, coormq.PackageLoadLogDetail{ + Storage: stg, + UserID: raw.UserID, + CreateTime: raw.CreateTime, + }) + } + + return mq.ReplyOK(coormq.RespGetPackageLoadLogDetails(logs)) +} diff --git a/scanner/internal/config/config.go b/scanner/internal/config/config.go index 9e66651..6fc6830 100644 --- a/scanner/internal/config/config.go +++ b/scanner/internal/config/config.go @@ -9,8 +9,8 @@ import ( ) type Config struct { - MinAvailableRepProportion float32 `json:"minAvailableRepProportion"` // 可用的备份至少要占所有备份的比例,向上去整 - NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 + ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` + NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 Logger log.Config `json:"logger"` DB db.Config `json:"db"` diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go new file mode 100644 index 0000000..b9d12ba --- /dev/null +++ b/scanner/internal/event/check_package_redundancy.go @@ -0,0 +1,629 @@ +package event + +import ( + "fmt" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/sort" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + "gitlink.org.cn/cloudream/storage/scanner/internal/config" +) + +const ( + monthHours = 30 * 24 + yearHours = 365 * 24 +) + +type CheckPackageRedundancy struct { + *scevt.CheckPackageRedundancy +} + +func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageRedundancy { + return &CheckPackageRedundancy{ + CheckPackageRedundancy: evt, + } +} + +type NodeLoadInfo struct { + Node model.Node + LoadsRecentMonth int + LoadsRecentYear int +} + +func (t *CheckPackageRedundancy) TryMerge(other Event) bool { + event, ok := other.(*CheckPackageRedundancy) + if !ok { + return false + } + + return event.PackageID == t.PackageID +} + +func (t *CheckPackageRedundancy) Execute(execCtx ExecuteContext) { + log := logger.WithType[CheckPackageRedundancy]("Event") + log.Debugf("begin with %v", logger.FormatStruct(t.CheckPackageRedundancy)) + defer log.Debugf("end") + + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + log.Warnf("new coordinator client: %s", err.Error()) + return + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + getObjs, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.PackageID)) + if err != nil { + log.Warnf("getting package objects: %s", err.Error()) + return + } + + getLogs, err := coorCli.GetPackageLoadLogDetails(coormq.ReqGetPackageLoadLogDetails(t.PackageID)) + if err != nil { + log.Warnf("getting package load log details: %s", err.Error()) + return + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes(nil)) + if err != nil { + log.Warnf("getting all nodes: %s", err.Error()) + return + } + + if len(getNodes.Nodes) == 0 { + log.Warnf("no available nodes") + return + } + + allNodes := make(map[cdssdk.NodeID]*NodeLoadInfo) + for _, node := range getNodes.Nodes { + allNodes[node.NodeID] = &NodeLoadInfo{ + Node: node, + } + } + + for _, log := range getLogs.Logs { + info, ok := allNodes[log.Storage.NodeID] + if !ok { + continue + } + + sinceNow := time.Since(log.CreateTime) + if sinceNow.Hours() < monthHours { + info.LoadsRecentMonth++ + } else if sinceNow.Hours() < yearHours { + info.LoadsRecentYear++ + } + } + + var changedObjects []coormq.ChangeObjectRedundancyEntry + + defRep := cdssdk.DefaultRepRedundancy + defEC := cdssdk.DefaultECRedundancy + + newRepNodes := t.chooseNewNodesForRep(&defRep, allNodes) + newECNodes := t.chooseNewNodesForEC(&defEC, allNodes) + + for _, obj := range getObjs.Objects { + var entry *coormq.ChangeObjectRedundancyEntry + var err error + + shouldUseEC := obj.Object.Size > config.Cfg().ECFileSizeThreshold + + switch red := obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + if shouldUseEC { + log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> ec") + entry, err = t.noneToEC(obj, &defEC, newECNodes) + } else { + log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: none -> rep") + entry, err = t.noneToRep(obj, &defRep, newRepNodes) + } + + case *cdssdk.RepRedundancy: + if shouldUseEC { + log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: rep -> ec") + entry, err = t.repToEC(obj, &defEC, newECNodes) + } else { + uploadNodes := t.rechooseNodesForRep(obj, red, allNodes) + entry, err = t.repToRep(obj, &defRep, uploadNodes) + } + + case *cdssdk.ECRedundancy: + if shouldUseEC { + uploadNodes := t.rechooseNodesForEC(obj, red, allNodes) + entry, err = t.ecToEC(obj, red, &defEC, uploadNodes) + } else { + log.WithField("ObjectID", obj.Object.ObjectID).Debugf("redundancy: ec -> rep") + entry, err = t.ecToRep(obj, red, &defRep, newRepNodes) + } + } + + if entry != nil { + changedObjects = append(changedObjects, *entry) + } + + if err != nil { + log.WithField("ObjectID", obj.Object.ObjectID).Warnf("%s, its redundancy wont be changed", err.Error()) + } + } + + if len(changedObjects) == 0 { + return + } + + _, err = coorCli.ChangeObjectRedundancy(coormq.ReqChangeObjectRedundancy(changedObjects)) + if err != nil { + log.Warnf("requesting to change object redundancy: %s", err.Error()) + return + } +} + +func (t *CheckPackageRedundancy) chooseNewNodesForRep(red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { + sortedNodes := sort.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + dm := right.LoadsRecentMonth - left.LoadsRecentMonth + if dm != 0 { + return dm + } + + return right.LoadsRecentYear - left.LoadsRecentYear + }) + + return t.chooseSoManyNodes(red.RepCount, sortedNodes) +} + +func (t *CheckPackageRedundancy) chooseNewNodesForEC(red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { + sortedNodes := sort.Sort(lo.Values(allNodes), func(left *NodeLoadInfo, right *NodeLoadInfo) int { + dm := right.LoadsRecentMonth - left.LoadsRecentMonth + if dm != 0 { + return dm + } + + return right.LoadsRecentYear - left.LoadsRecentYear + }) + + return t.chooseSoManyNodes(red.N, sortedNodes) +} + +func (t *CheckPackageRedundancy) rechooseNodesForRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { + type rechooseNode struct { + *NodeLoadInfo + CachedBlockIndex int + } + + var rechooseNodes []*rechooseNode + for _, node := range allNodes { + cachedBlockIndex := -1 + for _, block := range obj.Blocks { + if lo.Contains(block.CachedNodeIDs, node.Node.NodeID) { + cachedBlockIndex = block.Index + break + } + } + + rechooseNodes = append(rechooseNodes, &rechooseNode{ + NodeLoadInfo: node, + CachedBlockIndex: cachedBlockIndex, + }) + } + + sortedNodes := sort.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + dm := right.LoadsRecentMonth - left.LoadsRecentMonth + if dm != 0 { + return dm + } + + // 已经缓存了文件块的节点优先选择 + v := sort.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + if v != 0 { + return v + } + + return right.LoadsRecentYear - left.LoadsRecentYear + }) + + return t.chooseSoManyNodes(red.RepCount, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo })) +} + +func (t *CheckPackageRedundancy) rechooseNodesForEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, allNodes map[cdssdk.NodeID]*NodeLoadInfo) []*NodeLoadInfo { + type rechooseNode struct { + *NodeLoadInfo + CachedBlockIndex int + } + + var rechooseNodes []*rechooseNode + for _, node := range allNodes { + cachedBlockIndex := -1 + for _, block := range obj.Blocks { + if lo.Contains(block.CachedNodeIDs, node.Node.NodeID) { + cachedBlockIndex = block.Index + break + } + } + + rechooseNodes = append(rechooseNodes, &rechooseNode{ + NodeLoadInfo: node, + CachedBlockIndex: cachedBlockIndex, + }) + } + + sortedNodes := sort.Sort(rechooseNodes, func(left *rechooseNode, right *rechooseNode) int { + dm := right.LoadsRecentMonth - left.LoadsRecentMonth + if dm != 0 { + return dm + } + + // 已经缓存了文件块的节点优先选择 + v := sort.CmpBool(right.CachedBlockIndex > -1, left.CachedBlockIndex > -1) + if v != 0 { + return v + } + + return right.LoadsRecentYear - left.LoadsRecentYear + }) + + // TODO 可以考虑选择已有块的节点时,能依然按照Index顺序选择 + return t.chooseSoManyNodes(red.N, lo.Map(sortedNodes, func(node *rechooseNode, idx int) *NodeLoadInfo { return node.NodeLoadInfo })) +} + +func (t *CheckPackageRedundancy) chooseSoManyNodes(count int, nodes []*NodeLoadInfo) []*NodeLoadInfo { + repeateCount := (count + len(nodes) - 1) / len(nodes) + extedNodes := make([]*NodeLoadInfo, repeateCount*len(nodes)) + + // 使用复制的方式将节点数扩充到要求的数量 + // 复制之后的结构:ABCD -> AAABBBCCCDDD + for p := 0; p < repeateCount; p++ { + for i, node := range nodes { + putIdx := i*repeateCount + p + extedNodes[putIdx] = node + } + } + extedNodes = extedNodes[:count] + + var chosen []*NodeLoadInfo + for len(chosen) < count { + // 在每一轮内都选不同地区的节点,如果节点数不够,那么就再来一轮 + chosenLocations := make(map[cdssdk.LocationID]bool) + for i, node := range extedNodes { + if node == nil { + continue + } + + if chosenLocations[node.Node.LocationID] { + continue + } + + chosen = append(chosen, node) + chosenLocations[node.Node.LocationID] = true + extedNodes[i] = nil + } + } + + return chosen +} + +func (t *CheckPackageRedundancy) noneToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { + if len(obj.CachedNodeIDs) == 0 { + return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") + } + + // 如果选择的备份节点都是同一个,那么就只要上传一次 + uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID }) + + var blocks []stgmod.ObjectBlock + for _, node := range uploadNodes { + err := t.pinObject(node.Node.NodeID, obj.Object.FileHash) + if err != nil { + return nil, err + } + blocks = append(blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: 0, + NodeID: node.Node.NodeID, + FileHash: obj.Object.FileHash, + }) + } + + return &coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: red, + Blocks: blocks, + }, nil +} + +func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + if len(obj.CachedNodeIDs) == 0 { + return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to ec") + } + + getNodes, err := coorCli.GetNodes(coormq.NewGetNodes([]cdssdk.NodeID{obj.CachedNodeIDs[0]})) + if err != nil { + return nil, fmt.Errorf("requesting to get nodes: %w", err) + } + + planBlder := plans.NewPlanBuilder() + inputStrs := planBlder.AtAgent(getNodes.Nodes[0]).IPFSRead(obj.Object.FileHash).ChunkedSplit(red.ChunkSize, red.K, true) + outputStrs := planBlder.AtAgent(getNodes.Nodes[0]).ECReconstructAny(*red, lo.Range(red.K), lo.Range(red.N), inputStrs.Streams...) + for i := 0; i < red.N; i++ { + outputStrs.Stream(i).GRPCSend(uploadNodes[i].Node).IPFSWrite(fmt.Sprintf("%d", i)) + } + plan, err := planBlder.Build() + if err != nil { + return nil, fmt.Errorf("building io plan: %w", err) + } + + exec, err := plans.Execute(*plan) + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + ioRet, err := exec.Wait() + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + var blocks []stgmod.ObjectBlock + for i := 0; i < red.N; i++ { + blocks = append(blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: i, + NodeID: uploadNodes[i].Node.NodeID, + FileHash: ioRet.ResultValues[fmt.Sprintf("%d", i)].(string), + }) + } + + return &coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: red, + Blocks: blocks, + }, nil +} + +func (t *CheckPackageRedundancy) repToRep(obj stgmod.ObjectDetail, red *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { + if len(obj.CachedNodeIDs) == 0 { + return nil, fmt.Errorf("object is not cached on any nodes, cannot change its redundancy to rep") + } + + // 如果选择的备份节点都是同一个,那么就只要上传一次 + uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID }) + + for _, node := range uploadNodes { + err := t.pinObject(node.Node.NodeID, obj.Object.FileHash) + if err != nil { + logger.WithField("ObjectID", obj.Object.ObjectID). + Warn(err.Error()) + return nil, err + } + } + + var blocks []stgmod.ObjectBlock + for _, node := range uploadNodes { + // 由于更新冗余方式会删除所有Block记录然后重新填充, + // 所以即使是节点跳过了上传,也需要为它添加一条Block记录 + blocks = append(blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: 0, + NodeID: node.Node.NodeID, + FileHash: obj.Object.FileHash, + }) + } + + return &coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: red, + Blocks: blocks, + }, nil +} + +func (t *CheckPackageRedundancy) repToEC(obj stgmod.ObjectDetail, red *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { + return t.noneToEC(obj, red, uploadNodes) +} + +func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.RepRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + var chosenBlocks []stgmod.ObjectBlockDetail + var chosenBlockIndexes []int + for _, block := range obj.Blocks { + if len(block.CachedNodeIDs) > 0 { + chosenBlocks = append(chosenBlocks, block) + chosenBlockIndexes = append(chosenBlockIndexes, block.Index) + } + + if len(chosenBlocks) == srcRed.K { + break + } + } + + if len(chosenBlocks) < srcRed.K { + return nil, fmt.Errorf("no enough blocks to reconstruct the original file data") + } + + // 如果选择的备份节点都是同一个,那么就只要上传一次 + uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID }) + + // 每个被选节点都在自己节点上重建原始数据 + planBlder := plans.NewPlanBuilder() + for i := range uploadNodes { + tarNode := planBlder.AtAgent(uploadNodes[i].Node) + + var inputs []*plans.AgentStream + for _, block := range chosenBlocks { + inputs = append(inputs, tarNode.IPFSRead(block.FileHash)) + } + + outputs := tarNode.ECReconstruct(*srcRed, chosenBlockIndexes, inputs...) + tarNode.ChunkedJoin(srcRed.ChunkSize, outputs.Streams...).Length(obj.Object.Size).IPFSWrite(fmt.Sprintf("%d", i)) + } + + plan, err := planBlder.Build() + if err != nil { + return nil, fmt.Errorf("building io plan: %w", err) + } + + exec, err := plans.Execute(*plan) + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + ioRet, err := exec.Wait() + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + var blocks []stgmod.ObjectBlock + for i := range uploadNodes { + blocks = append(blocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: 0, + NodeID: uploadNodes[i].Node.NodeID, + FileHash: ioRet.ResultValues[fmt.Sprintf("%d", i)].(string), + }) + } + + return &coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: tarRed, + Blocks: blocks, + }, nil +} + +func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk.ECRedundancy, tarRed *cdssdk.ECRedundancy, uploadNodes []*NodeLoadInfo) (*coormq.ChangeObjectRedundancyEntry, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return nil, fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + var chosenBlocks []stgmod.ObjectBlockDetail + var chosenBlockIndexes []int + for _, block := range obj.Blocks { + if len(block.CachedNodeIDs) > 0 { + chosenBlocks = append(chosenBlocks, block) + chosenBlockIndexes = append(chosenBlockIndexes, block.Index) + } + + if len(chosenBlocks) == srcRed.K { + break + } + } + + if len(chosenBlocks) < srcRed.K { + return nil, fmt.Errorf("no enough blocks to reconstruct the original file data") + } + + // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 + planBlder := plans.NewPlanBuilder() + + var newBlocks []stgmod.ObjectBlock + shouldUpdateBlocks := false + for i := range obj.Blocks { + newBlocks = append(newBlocks, stgmod.ObjectBlock{ + ObjectID: obj.Object.ObjectID, + Index: i, + NodeID: uploadNodes[i].Node.NodeID, + FileHash: obj.Blocks[i].FileHash, + }) + + // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 + if lo.Contains(obj.Blocks[i].NodeIDs, uploadNodes[i].Node.NodeID) { + continue + } + + shouldUpdateBlocks = true + + // 新选的节点不在Block表中,但实际上保存了分块的数据,那么只需建立一条Block记录即可 + if lo.Contains(obj.Blocks[i].CachedNodeIDs, uploadNodes[i].Node.NodeID) { + continue + } + + // 否则就要重建出这个节点需要的块 + tarNode := planBlder.AtAgent(uploadNodes[i].Node) + + var inputs []*plans.AgentStream + for _, block := range chosenBlocks { + inputs = append(inputs, tarNode.IPFSRead(block.FileHash)) + } + + // 输出只需要自己要保存的那一块 + tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs...).Stream(0).IPFSWrite("") + } + + plan, err := planBlder.Build() + if err != nil { + return nil, fmt.Errorf("building io plan: %w", err) + } + + exec, err := plans.Execute(*plan) + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + // 如果没有任何Plan,Wait会直接返回成功 + _, err = exec.Wait() + if err != nil { + return nil, fmt.Errorf("executing io plan: %w", err) + } + + if !shouldUpdateBlocks { + return nil, nil + } + + return &coormq.ChangeObjectRedundancyEntry{ + ObjectID: obj.Object.ObjectID, + Redundancy: tarRed, + Blocks: newBlocks, + }, nil +} + +func (t *CheckPackageRedundancy) pinObject(nodeID cdssdk.NodeID, fileHash string) error { + agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + return fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agtCli) + + pinObjResp, err := agtCli.StartPinningObject(agtmq.NewStartPinningObject(fileHash)) + if err != nil { + return fmt.Errorf("start pinning object: %w", err) + } + + for { + waitResp, err := agtCli.WaitPinningObject(agtmq.NewWaitPinningObject(pinObjResp.TaskID, int64(time.Second)*5)) + if err != nil { + return fmt.Errorf("waitting pinning object: %w", err) + } + + if waitResp.IsComplete { + if waitResp.Error != "" { + return fmt.Errorf("agent pinning object: %s", waitResp.Error) + } + + break + } + } + + return nil +} + +func init() { + RegisterMessageConvertor(NewCheckPackageRedundancy) +} diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index d32a3d5..f9fdf40 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -42,6 +42,8 @@ func FromMessage(msg scevt.Event) (Event, error) { return event, nil } -func RegisterMessageConvertor[T any](converter func(msg T) Event) { - typedispatcher.Add(msgDispatcher, converter) +func RegisterMessageConvertor[T any, TEvt Event](converter func(msg T) TEvt) { + typedispatcher.Add(msgDispatcher, func(msg T) Event { + return converter(msg) + }) } diff --git a/scanner/internal/event/event_test.go b/scanner/internal/event/event_test.go new file mode 100644 index 0000000..5ff44e1 --- /dev/null +++ b/scanner/internal/event/event_test.go @@ -0,0 +1,76 @@ +package event + +import ( + "testing" + + "github.com/samber/lo" + . "github.com/smartystreets/goconvey/convey" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" +) + +func Test_chooseSoManyNodes(t *testing.T) { + testcases := []struct { + title string + allNodes []*NodeLoadInfo + count int + expectedNodeIDs []cdssdk.NodeID + }{ + { + title: "节点数量充足", + allNodes: []*NodeLoadInfo{ + {Node: model.Node{NodeID: cdssdk.NodeID(1)}}, + {Node: model.Node{NodeID: cdssdk.NodeID(2)}}, + }, + count: 2, + expectedNodeIDs: []cdssdk.NodeID{1, 2}, + }, + { + title: "节点数量超过", + allNodes: []*NodeLoadInfo{ + {Node: model.Node{NodeID: cdssdk.NodeID(1)}}, + {Node: model.Node{NodeID: cdssdk.NodeID(2)}}, + {Node: model.Node{NodeID: cdssdk.NodeID(3)}}, + }, + count: 2, + expectedNodeIDs: []cdssdk.NodeID{1, 2}, + }, + { + title: "只有一个节点,节点数量不够", + allNodes: []*NodeLoadInfo{ + {Node: model.Node{NodeID: cdssdk.NodeID(1)}}, + }, + count: 3, + expectedNodeIDs: []cdssdk.NodeID{1, 1, 1}, + }, + { + title: "多个同地区节点,节点数量不够", + allNodes: []*NodeLoadInfo{ + {Node: model.Node{NodeID: cdssdk.NodeID(1)}}, + {Node: model.Node{NodeID: cdssdk.NodeID(2)}}, + }, + count: 5, + expectedNodeIDs: []cdssdk.NodeID{1, 1, 1, 2, 2}, + }, + { + title: "节点数量不够,且在不同地区", + allNodes: []*NodeLoadInfo{ + {Node: model.Node{NodeID: cdssdk.NodeID(1), LocationID: cdssdk.LocationID(1)}}, + {Node: model.Node{NodeID: cdssdk.NodeID(2), LocationID: cdssdk.LocationID(2)}}, + }, + count: 5, + expectedNodeIDs: []cdssdk.NodeID{1, 2, 1, 2, 1}, + }, + } + + for _, test := range testcases { + Convey(test.title, t, func() { + var t CheckPackageRedundancy + chosenNodes := t.chooseSoManyNodes(test.count, test.allNodes) + + chosenNodeIDs := lo.Map(chosenNodes, func(item *NodeLoadInfo, idx int) cdssdk.NodeID { return item.Node.NodeID }) + + So(chosenNodeIDs, ShouldResemble, test.expectedNodeIDs) + }) + } +} diff --git a/scanner/internal/tickevent/batch_check_package_redudancy.go b/scanner/internal/tickevent/batch_check_package_redudancy.go new file mode 100644 index 0000000..1fd1a93 --- /dev/null +++ b/scanner/internal/tickevent/batch_check_package_redudancy.go @@ -0,0 +1,48 @@ +package tickevent + +import ( + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" + evt "gitlink.org.cn/cloudream/storage/scanner/internal/event" +) + +type BatchCheckPackageRedundancy struct { + lastCheckStart int +} + +func NewBatchCheckPackageRedundancy() *BatchCheckPackageRedundancy { + return &BatchCheckPackageRedundancy{} +} + +func (e *BatchCheckPackageRedundancy) Execute(ctx ExecuteContext) { + log := logger.WithType[BatchCheckPackageRedundancy]("TickEvent") + log.Debugf("begin") + defer log.Debugf("end") + + // TODO 更好的策略 + nowHour := time.Now().Hour() + if nowHour > 6 { + return + } + + packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CheckPackageBatchSize) + if err != nil { + log.Warnf("batch get package ids failed, err: %s", err.Error()) + return + } + + for _, id := range packageIDs { + ctx.Args.EventExecutor.Post(evt.NewCheckPackageRedundancy(event.NewCheckPackageRedundancy(id))) + } + + // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 + if len(packageIDs) < CheckPackageBatchSize { + e.lastCheckStart = 0 + log.Debugf("all package checked, next time will start check at offset 0") + + } else { + e.lastCheckStart += CheckPackageBatchSize + } +} diff --git a/scanner/main.go b/scanner/main.go index 2b01002..181b732 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -126,4 +126,6 @@ func startTickEvent(tickExecutor *tickevent.Executor) { tickExecutor.Start(tickevent.NewCheckAgentState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) tickExecutor.Start(tickevent.NewCheckCache(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) + + tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 10 * 60 * 1000}) }