From d027969bf79a2f182b834f44e5fc26c7b2557d0c Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 3 Jan 2024 09:44:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E8=BD=BDPackage=E6=97=B6=E5=B0=86?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E5=88=B0=E7=9A=84Block=E7=BC=93=E5=AD=98?= =?UTF-8?q?=E5=88=B0IPFS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/mq/storage.go | 30 +-- agent/internal/task/storage_load_package.go | 219 ++++++++++++++++++- client/internal/cmdline/storage.go | 4 +- client/internal/http/storage.go | 4 +- client/internal/services/storage.go | 56 ++++- client/internal/task/storage_load_package.go | 100 --------- common/pkgs/db/object_block.go | 10 +- common/pkgs/mq/coordinator/storage.go | 17 +- coordinator/internal/mq/storage.go | 26 +++ 9 files changed, 308 insertions(+), 158 deletions(-) delete mode 100644 client/internal/task/storage_load_package.go diff --git a/agent/internal/mq/storage.go b/agent/internal/mq/storage.go index b87f1d4..bdc006e 100644 --- a/agent/internal/mq/storage.go +++ b/agent/internal/mq/storage.go @@ -24,31 +24,7 @@ import ( ) func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed") - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID)) - if err != nil { - logger.WithField("StorageID", msg.StorageID). - Warnf("getting storage info: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed") - } - - outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, msg.UserID, msg.PackageID) - if err = os.MkdirAll(outputDirPath, 0755); err != nil { - logger.WithField("StorageID", msg.StorageID). - Warnf("creating output directory: %s", err.Error()) - - return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed") - } - - tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, outputDirPath)) + tsk := svc.taskManager.StartNew(mytask.NewStorageLoadPackage(msg.UserID, msg.PackageID, msg.StorageID)) return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID())) } @@ -70,7 +46,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* loadTsk := tsk.Body().(*mytask.StorageLoadPackage) - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { @@ -82,7 +58,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* loadTsk := tsk.Body().(*mytask.StorageLoadPackage) - return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) + return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath)) } return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "")) diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index 79e0b70..a2654a2 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -1,30 +1,231 @@ package task import ( + "fmt" + "io" + "os" + "path/filepath" "time" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/task" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/cmd" + myio "gitlink.org.cn/cloudream/common/utils/io" + myref "gitlink.org.cn/cloudream/common/utils/reflect" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/utils" ) type StorageLoadPackage struct { - cmd *cmd.DownloadPackage - FullPath string + FullOutputPath string + + userID cdssdk.UserID + packageID cdssdk.PackageID + storageID cdssdk.StorageID + pinnedBlocks []stgmod.ObjectBlock } -func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, outputPath string) *StorageLoadPackage { +func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage { return &StorageLoadPackage{ - cmd: cmd.NewDownloadPackage(userID, packageID, outputPath), - FullPath: outputPath, + userID: userID, + packageID: packageID, + storageID: storageID, } } func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.cmd.Execute(&cmd.DownloadPackageContext{ - Distlock: ctx.distlock, - }) + err := t.do(task, ctx) complete(err, CompleteOption{ RemovingDelay: time.Minute, }) } + +func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new IPFS client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) + if err != nil { + return fmt.Errorf("request to coordinator: %w", err) + } + + outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Directory, t.userID, t.packageID) + if err = os.MkdirAll(outputDirPath, 0755); err != nil { + return fmt.Errorf("creating output directory: %w", err) + } + t.FullOutputPath = outputDirPath + + getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.NewGetPackageObjectDetails(t.packageID)) + if err != nil { + return fmt.Errorf("getting package object details: %w", err) + } + + mutex, err := reqbuilder.NewBuilder(). + // 提前占位 + Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID). + // 保护在storage目录中下载的文件 + Storage().Buzy(t.storageID). + // 保护下载文件时同时保存到IPFS的文件 + IPFS().Buzy(getStgResp.NodeID). + MutexLock(ctx.distlock) + if err != nil { + return fmt.Errorf("acquire locks failed, err: %w", err) + } + defer mutex.Unlock() + + for _, obj := range getObjectDetails.Objects { + err := t.downloadOne(ipfsCli, outputDirPath, obj) + if err != nil { + return err + } + } + + _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID, t.pinnedBlocks)) + if err != nil { + return fmt.Errorf("loading package to storage: %w", err) + } + + return err +} + +func (t *StorageLoadPackage) downloadOne(ipfsCli *ipfs.PoolClient, dir string, obj stgmod.ObjectDetail) error { + var file io.ReadCloser + + switch red := obj.Object.Redundancy.(type) { + case *cdssdk.NoneRedundancy: + reader, err := t.downloadNoneOrRepObject(ipfsCli, obj) + if err != nil { + return fmt.Errorf("downloading object: %w", err) + } + file = reader + + case *cdssdk.RepRedundancy: + reader, err := t.downloadNoneOrRepObject(ipfsCli, obj) + if err != nil { + return fmt.Errorf("downloading rep object: %w", err) + } + file = reader + + case *cdssdk.ECRedundancy: + reader, pinnedBlocks, err := t.downloadECObject(ipfsCli, obj, red) + if err != nil { + return fmt.Errorf("downloading ec object: %w", err) + } + file = reader + t.pinnedBlocks = append(t.pinnedBlocks, pinnedBlocks...) + + default: + return fmt.Errorf("unknow redundancy type: %v", myref.TypeOfValue(obj.Object.Redundancy)) + } + defer file.Close() + + fullPath := filepath.Join(dir, obj.Object.Path) + + lastDirPath := filepath.Dir(fullPath) + if err := os.MkdirAll(lastDirPath, 0755); err != nil { + return fmt.Errorf("creating object last dir: %w", err) + } + + outputFile, err := os.Create(fullPath) + if err != nil { + return fmt.Errorf("creating object file: %w", err) + } + defer outputFile.Close() + + if _, err := io.Copy(outputFile, file); err != nil { + return fmt.Errorf("writting object to file: %w", err) + } + + return nil +} + +func (t *StorageLoadPackage) downloadNoneOrRepObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail) (io.ReadCloser, error) { + if len(obj.Blocks) == 0 { + return nil, fmt.Errorf("no node has this object") + } + + // 异步pin,不管实际有没有成功 + go func() { + ipfsCli.Pin(obj.Object.FileHash) + }() + + file, err := ipfsCli.OpenRead(obj.Object.FileHash) + if err != nil { + return nil, err + } + + return file, nil +} + +func (t *StorageLoadPackage) downloadECObject(ipfsCli *ipfs.PoolClient, obj stgmod.ObjectDetail, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, []stgmod.ObjectBlock, error) { + var chosenBlocks []stgmod.GrouppedObjectBlock + grpBlocks := obj.GroupBlocks() + for i := range grpBlocks { + if len(chosenBlocks) == ecRed.K { + break + } + + chosenBlocks = append(chosenBlocks, grpBlocks[i]) + } + + if len(chosenBlocks) < ecRed.K { + return nil, nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", ecRed.K, len(chosenBlocks)) + } + + var fileStrs []io.ReadCloser + + rs, err := ec.NewRs(ecRed.K, ecRed.N, ecRed.ChunkSize) + if err != nil { + return nil, nil, fmt.Errorf("new rs: %w", err) + } + + for i := range chosenBlocks { + // 异步pin,不管实际有没有成功 + go func() { + ipfsCli.Pin(chosenBlocks[i].FileHash) + }() + + str, err := ipfsCli.OpenRead(chosenBlocks[i].FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() + } + return nil, nil, fmt.Errorf("donwloading file: %w", err) + } + + fileStrs = append(fileStrs, str) + } + + fileReaders, filesCloser := myio.ToReaders(fileStrs) + + var indexes []int + var pinnedBlocks []stgmod.ObjectBlock + for _, b := range chosenBlocks { + indexes = append(indexes, b.Index) + pinnedBlocks = append(pinnedBlocks, stgmod.ObjectBlock{ + ObjectID: b.ObjectID, + Index: b.Index, + NodeID: *stgglb.Local.NodeID, + FileHash: b.FileHash, + }) + } + + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(ecRed.ChunkSize)), obj.Object.Size), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), pinnedBlocks, nil +} diff --git a/client/internal/cmdline/storage.go b/client/internal/cmdline/storage.go index b99bd4a..355e85c 100644 --- a/client/internal/cmdline/storage.go +++ b/client/internal/cmdline/storage.go @@ -8,13 +8,13 @@ import ( ) func StorageLoadPackage(ctx CommandContext, packageID cdssdk.PackageID, storageID cdssdk.StorageID) error { - taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) + nodeID, taskID, err := ctx.Cmdline.Svc.StorageSvc().StartStorageLoadPackage(0, packageID, storageID) if err != nil { return fmt.Errorf("start loading package to storage: %w", err) } for { - complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := ctx.Cmdline.Svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) if complete { if err != nil { return fmt.Errorf("moving complete with: %w", err) diff --git a/client/internal/http/storage.go b/client/internal/http/storage.go index d3d90e5..1b30f1c 100644 --- a/client/internal/http/storage.go +++ b/client/internal/http/storage.go @@ -40,7 +40,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { return } - taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID) + nodeID, taskID, err := s.svc.StorageSvc().StartStorageLoadPackage(*req.UserID, *req.PackageID, *req.StorageID) if err != nil { log.Warnf("start storage load package: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage load package failed")) @@ -48,7 +48,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) { } for { - complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(taskID, time.Second*10) + complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10) if complete { if err != nil { log.Warnf("loading complete with: %s", err.Error()) diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 8977f92..d5671a4 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -6,7 +6,6 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/client/internal/task" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -21,18 +20,55 @@ func (svc *Service) StorageSvc() *StorageService { return &StorageService{Service: svc} } -func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (string, error) { - tsk := svc.TaskMgr.StartNew(task.NewStorageLoadPackage(userID, packageID, storageID)) - return tsk.ID(), nil +func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) (cdssdk.NodeID, string, error) { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return 0, "", fmt.Errorf("new coordinator client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + stgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(userID, storageID)) + if err != nil { + return 0, "", fmt.Errorf("getting storage info: %w", err) + } + + agentCli, err := stgglb.AgentMQPool.Acquire(stgResp.NodeID) + if err != nil { + return 0, "", fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agentCli) + + startResp, err := agentCli.StartStorageLoadPackage(agtmq.NewStartStorageLoadPackage(userID, packageID, storageID)) + if err != nil { + return 0, "", fmt.Errorf("start storage load package: %w", err) + } + + return stgResp.NodeID, startResp.TaskID, nil } -func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout time.Duration) (bool, string, error) { - tsk := svc.TaskMgr.FindByID(taskID) - if tsk.WaitTimeout(waitTimeout) { - loadTsk := tsk.Body().(*task.StorageLoadPackage) - return true, loadTsk.ResultFullPath, tsk.Error() +func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) { + agentCli, err := stgglb.AgentMQPool.Acquire(nodeID) + if err != nil { + // TODO 失败是否要当做任务已经结束? + return true, "", fmt.Errorf("new agent client: %w", err) + } + defer stgglb.AgentMQPool.Release(agentCli) + + waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(taskID, waitTimeout.Milliseconds())) + if err != nil { + // TODO 请求失败是否要当做任务已经结束? + return true, "", fmt.Errorf("wait storage load package: %w", err) + } + + if !waitResp.IsComplete { + return false, "", nil } - return false, "", nil + + if waitResp.Error != "" { + return true, "", fmt.Errorf("%s", waitResp.Error) + } + + return true, waitResp.FullPath, nil } func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error { diff --git a/client/internal/task/storage_load_package.go b/client/internal/task/storage_load_package.go deleted file mode 100644 index 644c407..0000000 --- a/client/internal/task/storage_load_package.go +++ /dev/null @@ -1,100 +0,0 @@ -package task - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/task" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" -) - -// TODO 可以考虑不用Task来实现这些逻辑 -type StorageLoadPackage struct { - userID cdssdk.UserID - packageID cdssdk.PackageID - storageID cdssdk.StorageID - - ResultFullPath string -} - -func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage { - return &StorageLoadPackage{ - userID: userID, - packageID: packageID, - storageID: storageID, - } -} - -func (t *StorageLoadPackage) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - err := t.do(ctx) - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) -} - -func (t *StorageLoadPackage) do(ctx TaskContext) error { - mutex, err := reqbuilder.NewBuilder(). - // 提前占位 - Metadata().StoragePackage().CreateOne(t.userID, t.storageID, t.packageID). - // 保护在storage目录中下载的文件 - Storage().Buzy(t.storageID). - MutexLock(ctx.distlock) - if err != nil { - return fmt.Errorf("acquire locks failed, err: %w", err) - } - defer mutex.Unlock() - - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) - - getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(t.userID, t.storageID)) - if err != nil { - return fmt.Errorf("getting storage info: %w", err) - } - - // 然后向代理端发送移动文件的请求 - agentCli, err := stgglb.AgentMQPool.Acquire(getStgResp.NodeID) - if err != nil { - return fmt.Errorf("create agent client to %d failed, err: %w", getStgResp.NodeID, err) - } - defer stgglb.AgentMQPool.Release(agentCli) - - agentMoveResp, err := agentCli.StartStorageLoadPackage( - agtmq.NewStartStorageLoadPackage( - t.userID, - t.packageID, - t.storageID, - )) - if err != nil { - return fmt.Errorf("start loading package to storage: %w", err) - } - - for { - waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(agentMoveResp.TaskID, int64(time.Second)*5)) - if err != nil { - return fmt.Errorf("wait loading package: %w", err) - } - - if waitResp.IsComplete { - if waitResp.Error != "" { - return fmt.Errorf("agent loading package: %s", waitResp.Error) - } - - t.ResultFullPath = waitResp.FullPath - break - } - } - - _, err = coorCli.StoragePackageLoaded(coormq.NewStoragePackageLoaded(t.userID, t.storageID, t.packageID)) - if err != nil { - return fmt.Errorf("loading package to storage: %w", err) - } - return nil -} diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index 93c706f..9abcc76 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -31,6 +31,14 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index return err } +func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { + _, err := sqlx.NamedExec(ctx, + "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", + blocks, + ) + return err +} + func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error { _, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID) return err @@ -78,7 +86,7 @@ func (db *ObjectBlockDB) GetPackageBlockDetails(ctx SQLContext, packageID cdssdk var blocks []stgmod.ObjectBlock err = sqlx.Select(ctx, &blocks, - "select * from ObjectBlock where ObjectID = ? order by Index", + "select * from ObjectBlock where ObjectID = ? order by `Index`", obj.ObjectID, ) if err != nil { diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index a92169a..f12ce94 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -5,6 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" ) @@ -55,19 +56,21 @@ var _ = Register(Service.StoragePackageLoaded) type StoragePackageLoaded struct { mq.MessageBodyBase - UserID cdssdk.UserID `json:"userID"` - StorageID cdssdk.StorageID `json:"storageID"` - PackageID cdssdk.PackageID `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + StorageID cdssdk.StorageID `json:"storageID"` + PackageID cdssdk.PackageID `json:"packageID"` + PinnedBlocks []stgmod.ObjectBlock `json:"pinnedBlocks"` } type StoragePackageLoadedResp struct { mq.MessageBodyBase } -func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID) *StoragePackageLoaded { +func NewStoragePackageLoaded(userID cdssdk.UserID, stgID cdssdk.StorageID, packageID cdssdk.PackageID, pinnedBlocks []stgmod.ObjectBlock) *StoragePackageLoaded { return &StoragePackageLoaded{ - UserID: userID, - PackageID: packageID, - StorageID: stgID, + UserID: userID, + PackageID: packageID, + StorageID: stgID, + PinnedBlocks: pinnedBlocks, } } func NewStoragePackageLoadedResp() *StoragePackageLoadedResp { diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 08fe3e4..06ec0f6 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -25,6 +25,15 @@ func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStora func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) { err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + // 可以不用检查用户是否存在 + if ok, _ := svc.db.Package().IsAvailable(tx, msg.UserID, msg.PackageID); !ok { + return fmt.Errorf("package is not available to user") + } + + if ok, _ := svc.db.Storage().IsAvailable(tx, msg.UserID, msg.StorageID); !ok { + return fmt.Errorf("storage is not available to user") + } + err := svc.db.StoragePackage().Create(tx, msg.StorageID, msg.PackageID, msg.UserID) if err != nil { return fmt.Errorf("creating storage package: %w", err) @@ -35,6 +44,23 @@ func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coo return fmt.Errorf("creating storage package log: %w", err) } + stg, err := svc.db.Storage().GetByID(tx, msg.StorageID) + if err != nil { + return fmt.Errorf("getting storage: %w", err) + } + + err = svc.db.PinnedObject().CreateFromPackage(tx, msg.PackageID, stg.NodeID) + if err != nil { + return fmt.Errorf("creating pinned object from package: %w", err) + } + + if len(msg.PinnedBlocks) > 0 { + err = svc.db.ObjectBlock().BatchCreate(tx, msg.PinnedBlocks) + if err != nil { + return fmt.Errorf("batch creating object block: %w", err) + } + } + return nil }) if err != nil {