Browse Source

Storage增加LocalBase和RemoteBase字段

gitlink
Sydonian 1 year ago
parent
commit
b6f2f51f48
11 changed files with 98 additions and 60 deletions
  1. +41
    -11
      agent/internal/mq/storage.go
  2. +8
    -5
      agent/internal/task/storage_load_package.go
  3. +1
    -1
      client/internal/http/server.go
  4. +11
    -9
      client/internal/http/storage.go
  5. +16
    -6
      client/internal/services/storage.go
  6. +1
    -0
      common/assets/scripts/create_database.sql
  7. +1
    -8
      common/pkgs/db/model/model.go
  8. +13
    -13
      common/pkgs/mq/agent/storage.go
  9. +4
    -5
      common/utils/utils.go
  10. +1
    -1
      scanner/internal/event/agent_check_storage.go
  11. +1
    -1
      scanner/internal/event/agent_storage_gc.go

+ 41
- 11
agent/internal/mq/storage.go View File

@@ -46,7 +46,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*

loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.PackagePath, loadTsk.LocalBase, loadTsk.RemoteBase))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) {
@@ -58,15 +58,33 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*

loadTsk := tsk.Body().(*mytask.StorageLoadPackage)

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullOutputPath))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.PackagePath, loadTsk.LocalBase, loadTsk.RemoteBase))
}

return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", ""))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, "", "", "", ""))
}
}

func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
infos, err := os.ReadDir(msg.Directory)
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return mq.ReplyOK(agtmq.NewStorageCheckResp(
err.Error(),
nil,
))
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

// TODO UserID。应该设计两种接口,一种需要UserID,一种不需要。
getStg, err := coorCli.GetStorage(coormq.ReqGetStorage(cdssdk.UserID(1), msg.StorageID))
if err != nil {
return mq.ReplyOK(agtmq.NewStorageCheckResp(
err.Error(),
nil,
))
}

entries, err := os.ReadDir(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase))
if err != nil {
logger.Warnf("list storage directory failed, err: %s", err.Error())
return mq.ReplyOK(agtmq.NewStorageCheckResp(
@@ -77,7 +95,7 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe

var stgPkgs []model.StoragePackage

userDirs := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() })
userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() })
for _, dir := range userDirs {
userIDInt, err := strconv.ParseInt(dir.Name(), 10, 64)
if err != nil {
@@ -85,7 +103,7 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe
continue
}

pkgDir := utils.MakeStorageLoadDirectory(msg.Directory, dir.Name())
pkgDir := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase), dir.Name())
pkgDirs, err := os.ReadDir(pkgDir)
if err != nil {
logger.Warnf("reading package dir %s: %s", pkgDir, err.Error())
@@ -111,7 +129,19 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe
}

func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.CodeMessage) {
infos, err := os.ReadDir(msg.Directory)
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

// TODO UserID。应该设计两种接口,一种需要UserID,一种不需要。
getStg, err := coorCli.GetStorage(coormq.ReqGetStorage(cdssdk.UserID(1), msg.StorageID))
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

entries, err := os.ReadDir(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase))
if err != nil {
logger.Warnf("list storage directory failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "list directory files failed")
@@ -132,12 +162,12 @@ func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.C
pkgs[pkgIDStr] = true
}

userDirs := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() })
userDirs := lo.Filter(entries, func(info fs.DirEntry, index int) bool { return info.IsDir() })
for _, dir := range userDirs {
pkgMap, ok := userPkgs[dir.Name()]
// 第一级目录名是UserID,先删除UserID在StoragePackage表里没出现过的文件夹
if !ok {
rmPath := filepath.Join(msg.Directory, dir.Name())
rmPath := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase), dir.Name())
err := os.RemoveAll(rmPath)
if err != nil {
logger.Warnf("removing user dir %s: %s", rmPath, err.Error())
@@ -147,7 +177,7 @@ func (svc *Service) StorageGC(msg *agtmq.StorageGC) (*agtmq.StorageGCResp, *mq.C
continue
}

pkgDir := utils.MakeStorageLoadDirectory(msg.Directory, dir.Name())
pkgDir := filepath.Join(utils.MakeStorageLoadDirectory(getStg.Storage.LocalBase), dir.Name())
// 遍历每个UserID目录的packages目录里的内容
pkgs, err := os.ReadDir(pkgDir)
if err != nil {
@@ -188,7 +218,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
}

fullPath := filepath.Clean(filepath.Join(getStgResp.Storage.Directory, msg.Path))
fullPath := filepath.Clean(filepath.Join(getStgResp.Storage.LocalBase, msg.Path))

var uploadFilePathes []string
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {


+ 8
- 5
agent/internal/task/storage_load_package.go View File

@@ -26,7 +26,9 @@ import (
)

type StorageLoadPackage struct {
FullOutputPath string
PackagePath string
LocalBase string
RemoteBase string

userID cdssdk.UserID
packageID cdssdk.PackageID
@@ -67,11 +69,12 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
return fmt.Errorf("request to coordinator: %w", err)
}

outputDirPath := utils.MakeStorageLoadPackagePath(getStgResp.Storage.Directory, t.userID, t.packageID)
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
t.PackagePath = utils.MakeLoadedPackagePath(t.userID, t.packageID)
fullLocalPath := filepath.Join(getStgResp.Storage.LocalBase, t.PackagePath)

if err = os.MkdirAll(fullLocalPath, 0755); err != nil {
return fmt.Errorf("creating output directory: %w", err)
}
t.FullOutputPath = outputDirPath

getObjectDetails, err := coorCli.GetPackageObjectDetails(coormq.ReqGetPackageObjectDetails(t.packageID))
if err != nil {
@@ -92,7 +95,7 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
defer mutex.Unlock()

for _, obj := range getObjectDetails.Objects {
err := t.downloadOne(coorCli, ipfsCli, outputDirPath, obj)
err := t.downloadOne(coorCli, ipfsCli, fullLocalPath, obj)
if err != nil {
return err
}


+ 1
- 1
client/internal/http/server.go View File

@@ -58,7 +58,7 @@ func (s *Server) initRouters() {

rt.POST(cdssdk.StorageLoadPackagePath, s.Storage().LoadPackage)
rt.POST(cdssdk.StorageCreatePackagePath, s.Storage().CreatePackage)
rt.GET(cdssdk.StorageGetInfoPath, s.Storage().GetInfo)
rt.GET(cdssdk.StorageGetPath, s.Storage().Get)

rt.POST(cdssdk.CacheMovePackagePath, s.Cache().MovePackage)



+ 11
- 9
client/internal/http/storage.go View File

@@ -2,6 +2,7 @@ package http

import (
"net/http"
"path/filepath"
"time"

"github.com/gin-gonic/gin"
@@ -38,7 +39,7 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
}

for {
complete, fullPath, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
complete, ret, err := s.svc.StorageSvc().WaitStorageLoadPackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("loading complete with: %s", err.Error())
@@ -47,7 +48,10 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
}

ctx.JSON(http.StatusOK, OK(cdssdk.StorageLoadPackageResp{
FullPath: fullPath,
FullPath: filepath.Join(ret.RemoteBase, ret.PackagePath),
PackagePath: ret.PackagePath,
LocalBase: ret.LocalBase,
RemoteBase: ret.RemoteBase,
}))
return
}
@@ -101,10 +105,10 @@ func (s *StorageService) CreatePackage(ctx *gin.Context) {
}
}

func (s *StorageService) GetInfo(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.GetInfo")
func (s *StorageService) Get(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.Get")

var req cdssdk.StorageGetInfoReq
var req cdssdk.StorageGet
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding query: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
@@ -118,9 +122,7 @@ func (s *StorageService) GetInfo(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, OK(cdssdk.StorageGetInfoResp{
Name: info.Name,
NodeID: info.NodeID,
Directory: info.Directory,
ctx.JSON(http.StatusOK, OK(cdssdk.StorageGetResp{
Storage: *info,
}))
}

+ 16
- 6
client/internal/services/storage.go View File

@@ -76,29 +76,39 @@ func (svc *StorageService) StartStorageLoadPackage(userID cdssdk.UserID, package
return stgResp.Storage.NodeID, startResp.TaskID, nil
}

func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, string, error) {
type StorageLoadPackageResult struct {
PackagePath string
LocalBase string
RemoteBase string
}

func (svc *StorageService) WaitStorageLoadPackage(nodeID cdssdk.NodeID, taskID string, waitTimeout time.Duration) (bool, *StorageLoadPackageResult, error) {
agentCli, err := stgglb.AgentMQPool.Acquire(nodeID)
if err != nil {
// TODO 失败是否要当做任务已经结束?
return true, "", fmt.Errorf("new agent client: %w", err)
return true, nil, fmt.Errorf("new agent client: %w", err)
}
defer stgglb.AgentMQPool.Release(agentCli)

waitResp, err := agentCli.WaitStorageLoadPackage(agtmq.NewWaitStorageLoadPackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
// TODO 请求失败是否要当做任务已经结束?
return true, "", fmt.Errorf("wait storage load package: %w", err)
return true, nil, fmt.Errorf("wait storage load package: %w", err)
}

if !waitResp.IsComplete {
return false, "", nil
return false, nil, nil
}

if waitResp.Error != "" {
return true, "", fmt.Errorf("%s", waitResp.Error)
return true, nil, fmt.Errorf("%s", waitResp.Error)
}

return true, waitResp.FullPath, nil
return true, &StorageLoadPackageResult{
PackagePath: waitResp.PackagePath,
LocalBase: waitResp.LocalBase,
RemoteBase: waitResp.RemoteBase,
}, nil
}

func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {


+ 1
- 0
common/assets/scripts/create_database.sql View File

@@ -44,6 +44,7 @@ create table Storage (
Name varchar(100) not null comment '存储服务名称',
NodeID int not null comment '存储服务所在节点的ID',
Directory varchar(4096) not null comment '存储服务所在节点的目录',
Remote varchar(4096) not null,
State varchar(100) comment '状态'
) comment = "存储服务表";



+ 1
- 8
common/pkgs/db/model/model.go View File

@@ -11,14 +11,7 @@ import (
)

// TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里

type Storage struct {
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
Name string `db:"Name" json:"name"`
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
Directory string `db:"Directory" json:"directory"`
State string `db:"State" json:"state"`
}
type Storage = cdssdk.Storage

type User struct {
UserID cdssdk.UserID `db:"UserID" json:"userID"`


+ 13
- 13
common/pkgs/mq/agent/storage.go View File

@@ -61,9 +61,11 @@ type WaitStorageLoadPackage struct {
}
type WaitStorageLoadPackageResp struct {
mq.MessageBodyBase
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
FullPath string `json:"fullPath"`
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
PackagePath string `json:"packagePath"` // 加载后的Package的路径,相对于数据库中配置的Directory
LocalBase string `json:"localBase"` // 存储服务本地的目录,LocalBase + PackagePath = Package在代理节点上的完整路径
RemoteBase string `json:"remoteBase"` // 存储服务远程的目录,RemoteBase + PackagePath = Package在存储服务中的完整路径
}

func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageLoadPackage {
@@ -72,11 +74,13 @@ func NewWaitStorageLoadPackage(taskID string, waitTimeoutMs int64) *WaitStorageL
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitStorageLoadPackageResp(isComplete bool, err string, fullPath string) *WaitStorageLoadPackageResp {
func NewWaitStorageLoadPackageResp(isComplete bool, err string, packagePath string, localBase string, remoteBase string) *WaitStorageLoadPackageResp {
return &WaitStorageLoadPackageResp{
IsComplete: isComplete,
Error: err,
FullPath: fullPath,
IsComplete: isComplete,
Error: err,
PackagePath: packagePath,
LocalBase: localBase,
RemoteBase: remoteBase,
}
}
func (client *Client) WaitStorageLoadPackage(msg *WaitStorageLoadPackage, opts ...mq.RequestOption) (*WaitStorageLoadPackageResp, error) {
@@ -89,7 +93,6 @@ var _ = Register(Service.StorageCheck)
type StorageCheck struct {
mq.MessageBodyBase
StorageID cdssdk.StorageID `json:"storageID"`
Directory string `json:"directory"`
}
type StorageCheckResp struct {
mq.MessageBodyBase
@@ -97,10 +100,9 @@ type StorageCheckResp struct {
Packages []model.StoragePackage `json:"packages"`
}

func NewStorageCheck(storageID cdssdk.StorageID, directory string) *StorageCheck {
func NewStorageCheck(storageID cdssdk.StorageID) *StorageCheck {
return &StorageCheck{
StorageID: storageID,
Directory: directory,
}
}
func NewStorageCheckResp(dirState string, packages []model.StoragePackage) *StorageCheckResp {
@@ -119,17 +121,15 @@ var _ = Register(Service.StorageGC)
type StorageGC struct {
mq.MessageBodyBase
StorageID cdssdk.StorageID `json:"storageID"`
Directory string `json:"directory"`
Packages []model.StoragePackage `json:"packages"`
}
type StorageGCResp struct {
mq.MessageBodyBase
}

func ReqStorageGC(storageID cdssdk.StorageID, directory string, packages []model.StoragePackage) *StorageGC {
func ReqStorageGC(storageID cdssdk.StorageID, packages []model.StoragePackage) *StorageGC {
return &StorageGC{
StorageID: storageID,
Directory: directory,
Packages: packages,
}
}


+ 4
- 5
common/utils/utils.go View File

@@ -7,11 +7,10 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

// MakeStorageLoadPackagePath Load操作时,写入的文件夹的名称
func MakeStorageLoadPackagePath(stgDir string, userID cdssdk.UserID, packageID cdssdk.PackageID) string {
return filepath.Join(stgDir, strconv.FormatInt(int64(userID), 10), "packages", strconv.FormatInt(int64(packageID), 10))
func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string {
return filepath.Join("packages", strconv.FormatInt(int64(userID), 10), strconv.FormatInt(int64(packageID), 10))
}

func MakeStorageLoadDirectory(stgDir string, userIDStr string) string {
return filepath.Join(stgDir, userIDStr, "packages")
func MakeStorageLoadDirectory(stgDir string) string {
return filepath.Join(stgDir, "packages")
}

+ 1
- 1
scanner/internal/event/agent_check_storage.go View File

@@ -72,7 +72,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
}
defer stgglb.AgentMQPool.Release(agtCli)

checkResp, err := agtCli.StorageCheck(agtmq.NewStorageCheck(stg.StorageID, stg.Directory), mq.RequestOption{Timeout: time.Minute})
checkResp, err := agtCli.StorageCheck(agtmq.NewStorageCheck(stg.StorageID), mq.RequestOption{Timeout: time.Minute})
if err != nil {
log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error())
return


+ 1
- 1
scanner/internal/event/agent_storage_gc.go View File

@@ -74,7 +74,7 @@ func (t *AgentStorageGC) Execute(execCtx ExecuteContext) {
}
defer stgglb.AgentMQPool.Release(agtCli)

_, err = agtCli.StorageGC(agtmq.ReqStorageGC(t.StorageID, getStg.Directory, stgPkgs), mq.RequestOption{Timeout: time.Minute})
_, err = agtCli.StorageGC(agtmq.ReqStorageGC(t.StorageID, stgPkgs), mq.RequestOption{Timeout: time.Minute})
if err != nil {
log.WithField("StorageID", t.StorageID).Warnf("storage gc: %s", err.Error())
return


Loading…
Cancel
Save