Browse Source

Merge pull request '增加ClientPool,解除对config的依赖' (#2) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
0d46618f48
18 changed files with 471 additions and 187 deletions
  1. +13
    -13
      internal/config/config.go
  2. +0
    -19
      internal/services/cmd/agent.go
  3. +0
    -21
      internal/services/cmd/service.go
  4. +34
    -24
      internal/services/grpc/service.go
  5. +29
    -0
      internal/services/mq/agent.go
  6. +62
    -16
      internal/services/mq/cache.go
  7. +1
    -1
      internal/services/mq/object.go
  8. +15
    -0
      internal/services/mq/service.go
  9. +57
    -58
      internal/services/mq/storage.go
  10. +102
    -0
      internal/task/cache_move_package.go
  11. +43
    -0
      internal/task/create_ec_package.go
  12. +39
    -0
      internal/task/create_rep_package.go
  13. +26
    -0
      internal/task/download_package.go
  14. +14
    -1
      internal/task/ipfs_pin.go
  15. +14
    -1
      internal/task/ipfs_read.go
  16. +3
    -9
      internal/task/task.go
  17. +16
    -20
      main.go
  18. +3
    -4
      status_report.go

+ 13
- 13
internal/config/config.go View File

@@ -2,25 +2,25 @@ package config

import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/common/utils/ipfs"
stgmodels "gitlink.org.cn/cloudream/storage-common/models"
"gitlink.org.cn/cloudream/storage-common/pkgs/grpc"
stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq"
)

type Config struct {
ID int64 `json:"id"`
GRPCListenAddress string `json:"grpcListenAddress"`
GRPCPort int `json:"grpcPort"`
ECPacketSize int64 `json:"ecPacketSize"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
StorageBaseDir string `json:"storageBaseDir"`
TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒
Logger log.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"`
ID int64 `json:"id"`
Local stgmodels.LocalMachineInfo `json:"local"`
GRPC *grpc.Config `json:"grpc"`
ECPacketSize int64 `json:"ecPacketSize"`
StorageBaseDir string `json:"storageBaseDir"`
TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒
Logger log.Config `json:"logger"`
RabbitMQ stgmq.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
DistLock distlock.Config `json:"distlock"`
}

var cfg Config


+ 0
- 19
internal/services/cmd/agent.go View File

@@ -1,19 +0,0 @@
package cmd

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/consts"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) {
var ipfsState string

if svc.ipfs.IsUp() {
ipfsState = consts.IPFSStateOK
} else {
ipfsState = consts.IPFSStateOK
}

return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState))
}

+ 0
- 21
internal/services/cmd/service.go View File

@@ -1,21 +0,0 @@
package cmd

import (
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-agent/internal/task"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type Service struct {
ipfs *ipfs.IPFS
taskManager *task.Manager
coordinator *coormq.Client
}

func NewService(ipfs *ipfs.IPFS, taskMgr *task.Manager, coordinator *coormq.Client) *Service {
return &Service{
ipfs: ipfs,
taskManager: taskMgr,
coordinator: coordinator,
}
}

internal/services/grpc/grpc_service.go → internal/services/grpc/service.go View File

@@ -6,25 +6,29 @@ import (

log "gitlink.org.cn/cloudream/common/pkgs/logger"
myio "gitlink.org.cn/cloudream/common/utils/io"
"gitlink.org.cn/cloudream/common/utils/ipfs"
agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
"gitlink.org.cn/cloudream/storage-common/globals"
agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"
)

type GRPCService struct {
agentserver.FileTransportServer
ipfs *ipfs.IPFS
type Service struct {
agentserver.AgentServer
}

func NewService(ipfs *ipfs.IPFS) *GRPCService {
return &GRPCService{
ipfs: ipfs,
}
func NewService() *Service {
return &Service{}
}

func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer) error {
func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error {
log.Debugf("client upload file")

writer, err := s.ipfs.CreateFile()
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
log.Warnf("new ipfs client: %s", err.Error())
return fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

writer, err := ipfsCli.CreateFileStream()
if err != nil {
log.Warnf("create file failed, err: %s", err.Error())
return fmt.Errorf("create file failed, err: %w", err)
@@ -45,18 +49,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer)
return fmt.Errorf("recv message failed, err: %w", err)
}

if msg.Type == agentserver.FileDataPacketType_Data {
err = myio.WriteAll(writer, msg.Data)
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Abort(io.ErrClosedPipe)
log.Warnf("write data to file failed, err: %s", err.Error())
return fmt.Errorf("write data to file failed, err: %w", err)
}
err = myio.WriteAll(writer, msg.Data)
if err != nil {
// 关闭文件写入,不需要返回的hash和error
writer.Abort(io.ErrClosedPipe)
log.Warnf("write data to file failed, err: %s", err.Error())
return fmt.Errorf("write data to file failed, err: %w", err)
}

recvSize += int64(len(msg.Data))
recvSize += int64(len(msg.Data))

} else if msg.Type == agentserver.FileDataPacketType_EOF {
if msg.Type == agentserver.FileDataPacketType_EOF {
// 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash
hash, err := writer.Finish()
if err != nil {
@@ -65,7 +68,7 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer)
}

// 并将结果返回到客户端
err = server.SendAndClose(&agentserver.SendResp{
err = server.SendAndClose(&agentserver.SendIPFSFileResp{
FileHash: hash,
})
if err != nil {
@@ -78,10 +81,17 @@ func (s *GRPCService) SendFile(server agentserver.FileTransport_SendFileServer)
}
}

func (s *GRPCService) GetFile(req *agentserver.GetReq, server agentserver.FileTransport_GetFileServer) error {
func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserver.Agent_GetIPFSFileServer) error {
log.WithField("FileHash", req.FileHash).Debugf("client download file")

reader, err := s.ipfs.OpenRead(req.FileHash)
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
log.Warnf("new ipfs client: %s", err.Error())
return fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

reader, err := ipfsCli.OpenRead(req.FileHash)
if err != nil {
log.Warnf("open file %s to read failed, err: %s", req.FileHash, err.Error())
return fmt.Errorf("open file to read failed, err: %w", err)

+ 29
- 0
internal/services/mq/agent.go View File

@@ -0,0 +1,29 @@
package mq

import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) {
var ipfsState string

ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
logger.Warnf("new ipfs client: %s", err.Error())
ipfsState = consts.IPFSStateUnavailable

} else {
if ipfsCli.IsUp() {
ipfsState = consts.IPFSStateOK
} else {
ipfsState = consts.IPFSStateUnavailable
}
ipfsCli.Close()
}

return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState))
}

internal/services/cmd/ipfs.go → internal/services/mq/cache.go View File

@@ -1,34 +1,44 @@
package cmd
package mq

import (
"time"

shell "github.com/ipfs/go-ipfs-api"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/ipfs"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task"
mytask "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

func (svc *Service) CheckIPFS(msg *agtmq.CheckIPFS) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
filesMap, err := svc.ipfs.GetPinnedFiles()
func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
logger.Warnf("new ipfs client: %s", err.Error())
return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "new ipfs client failed")
}
defer ipfsCli.Close()

filesMap, err := ipfsCli.GetPinnedFiles()
if err != nil {
logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error())
return mq.ReplyFailed[agtmq.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
return mq.ReplyFailed[agtmq.CheckCacheResp](errorcode.OperationFailed, "get pinned files from ipfs failed")
}

// TODO 根据锁定清单过滤被锁定的文件的记录
if msg.IsComplete {
return svc.checkComplete(msg, filesMap)
return svc.checkComplete(msg, filesMap, ipfsCli)
} else {
return svc.checkIncrement(msg, filesMap)
return svc.checkIncrement(msg, filesMap, ipfsCli)
}
}

func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) checkIncrement(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash]
@@ -37,7 +47,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she
// 不处理
} else if cache.State == consts.CacheStateTemp {
logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp")
err := svc.ipfs.Unpin(cache.FileHash)
err := ipfsCli.Unpin(cache.FileHash)
if err != nil {
logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error())
}
@@ -52,7 +62,7 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she

} else if cache.State == consts.CacheStateTemp {
if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
}
}
}
@@ -60,10 +70,10 @@ func (svc *Service) checkIncrement(msg *agtmq.CheckIPFS, filesMap map[string]she

// 增量情况下,不需要对filesMap中没检查的记录进行处理

return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
return mq.ReplyOK(agtmq.NewCheckCacheResp(entries))
}

func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmq.CheckIPFSResp, *mq.CodeMessage) {
func (svc *Service) checkComplete(msg *agtmq.CheckCache, filesMap map[string]shell.PinInfo, ipfsCli *ipfs.PoolClient) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
var entries []agtmq.CheckIPFSRespEntry
for _, cache := range msg.Caches {
_, ok := filesMap[cache.FileHash]
@@ -72,7 +82,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel
// 不处理
} else if cache.State == consts.CacheStateTemp {
logger.WithField("FileHash", cache.FileHash).Debugf("unpin for cache entry state is temp")
err := svc.ipfs.Unpin(cache.FileHash)
err := ipfsCli.Unpin(cache.FileHash)
if err != nil {
logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error())
}
@@ -87,7 +97,7 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel

} else if cache.State == consts.CacheStateTemp {
if time.Since(cache.CacheTime) > time.Duration(config.Cfg().TempFileLifetime)*time.Second {
entries = append(entries, agtmq.NewCheckIPFSRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
entries = append(entries, agtmq.NewCheckCacheRespEntry(cache.FileHash, agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP))
}
}
}
@@ -96,12 +106,48 @@ func (svc *Service) checkComplete(msg *agtmq.CheckIPFS, filesMap map[string]shel
// map中剩下的数据是没有被遍历过,即Cache中没有记录的,那么就Unpin文件,并产生一条Temp记录
for hash := range filesMap {
logger.WithField("FileHash", hash).Debugf("unpin for no cacah entry")
err := svc.ipfs.Unpin(hash)
err := ipfsCli.Unpin(hash)
if err != nil {
logger.WithField("FileHash", hash).Warnf("unpin file failed, err: %s", err.Error())
}
entries = append(entries, agtmq.NewCheckIPFSRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP))
entries = append(entries, agtmq.NewCheckCacheRespEntry(hash, agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP))
}

return mq.ReplyOK(agtmq.NewCheckCacheResp(entries))
}

func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID))
return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID()))
}

func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) {
tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil {
return mq.ReplyFailed[agtmq.WaitCacheMovePackageResp](errorcode.TaskNotFound, "task not found")
}

return mq.ReplyOK(agtmq.NewCheckIPFSResp(entries))
if msg.WaitTimeoutMs == 0 {
tsk.Wait()

errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {

errMsg := ""
if tsk.Error() != nil {
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg))
}

return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, ""))
}
}

internal/services/cmd/object.go → internal/services/mq/object.go View File

@@ -1,4 +1,4 @@
package cmd
package mq

import (
"time"

+ 15
- 0
internal/services/mq/service.go View File

@@ -0,0 +1,15 @@
package mq

import (
"gitlink.org.cn/cloudream/storage-agent/internal/task"
)

type Service struct {
taskManager *task.Manager
}

func NewService(taskMgr *task.Manager) *Service {
return &Service{
taskManager: taskMgr,
}
}

internal/services/cmd/storage.go → internal/services/mq/storage.go View File

@@ -1,30 +1,36 @@
package cmd
package mq

import (
"io/fs"
"io/ioutil"
"os"
"path/filepath"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
mytask "gitlink.org.cn/cloudream/storage-agent/internal/task"
"gitlink.org.cn/cloudream/storage-common/consts"
"gitlink.org.cn/cloudream/storage-common/utils"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgcmd "gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
myos "gitlink.org.cn/cloudream/storage-common/utils/os"
"gitlink.org.cn/cloudream/storage-common/utils"
)

func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage) (*agtmq.StartStorageMovePackageResp, *mq.CodeMessage) {
getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
func (svc *Service) StartStorageLoadPackage(msg *agtmq.StartStorageLoadPackage) (*agtmq.StartStorageLoadPackageResp, *mq.CodeMessage) {
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer coorCli.Close()

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())
@@ -32,7 +38,7 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage)
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
}

outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageMovePackageDirName(msg.PackageID, msg.UserID))
outputDirPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, utils.MakeStorageLoadPackageDirName(msg.PackageID, msg.UserID))
if err = os.MkdirAll(outputDirPath, 0755); err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("creating output directory: %s", err.Error())
@@ -40,16 +46,16 @@ func (svc *Service) StartStorageMovePackage(msg *agtmq.StartStorageMovePackage)
return nil, mq.Failed(errorcode.OperationFailed, "create output directory failed")
}

tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](stgcmd.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath)))
return mq.ReplyOK(agtmq.NewStartStorageMovePackageResp(tsk.ID()))
tsk := svc.taskManager.StartNew(mytask.NewDownloadPackage(msg.UserID, msg.PackageID, outputDirPath))
return mq.ReplyOK(agtmq.NewStartStorageLoadPackageResp(tsk.ID()))
}

func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*agtmq.WaitStorageMovePackageResp, *mq.CodeMessage) {
logger.WithField("TaskID", msg.TaskID).Debugf("wait moving package")
func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (*agtmq.WaitStorageLoadPackageResp, *mq.CodeMessage) {
logger.WithField("TaskID", msg.TaskID).Debugf("wait loading package")

tsk := svc.taskManager.FindByID(msg.TaskID)
if tsk == nil {
return mq.ReplyFailed[agtmq.WaitStorageMovePackageResp](errorcode.TaskNotFound, "task not found")
return mq.ReplyFailed[agtmq.WaitStorageLoadPackageResp](errorcode.TaskNotFound, "task not found")
}

if msg.WaitTimeoutMs == 0 {
@@ -60,7 +66,7 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))

} else {
if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) {
@@ -70,17 +76,17 @@ func (svc *Service) WaitStorageMovePackage(msg *agtmq.WaitStorageMovePackage) (*
errMsg = tsk.Error().Error()
}

return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(true, errMsg))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg))
}

return mq.ReplyOK(agtmq.NewWaitStorageMovePackageResp(false, ""))
return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(false, ""))
}
}

func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory)

infos, err := ioutil.ReadDir(dirFullPath)
infos, err := os.ReadDir(dirFullPath)
if err != nil {
logger.Warnf("list storage directory failed, err: %s", err.Error())
return mq.ReplyOK(agtmq.NewStorageCheckResp(
@@ -89,30 +95,30 @@ func (svc *Service) StorageCheck(msg *agtmq.StorageCheck) (*agtmq.StorageCheckRe
))
}

fileInfos := lo.Filter(infos, func(info fs.FileInfo, index int) bool { return !info.IsDir() })
dirInfos := lo.Filter(infos, func(info fs.DirEntry, index int) bool { return info.IsDir() })

if msg.IsComplete {
return svc.checkStorageComplete(msg, fileInfos)
return svc.checkStorageComplete(msg, dirInfos)
} else {
return svc.checkStorageIncrement(msg, fileInfos)
return svc.checkStorageIncrement(msg, dirInfos)
}
}

func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
infosMap := make(map[string]fs.FileInfo)
for _, info := range fileInfos {
func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
infosMap := make(map[string]fs.DirEntry)
for _, info := range dirInfos {
infosMap[info.Name()] = info
}

var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages {
fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[fileName]
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[dirName]

if ok {
// 不需要做处理
// 删除map中的记录,表示此记录已被检查过
delete(infosMap, fileName)
delete(infosMap, dirName)

} else {
// 只要文件不存在,就删除StoragePackage表中的记录
@@ -125,22 +131,22 @@ func (svc *Service) checkStorageIncrement(msg *agtmq.StorageCheck, fileInfos []f
return mq.ReplyOK(agtmq.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries))
}

func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs.FileInfo) (*agtmq.StorageCheckResp, *mq.CodeMessage) {
func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, dirInfos []fs.DirEntry) (*agtmq.StorageCheckResp, *mq.CodeMessage) {

infosMap := make(map[string]fs.FileInfo)
for _, info := range fileInfos {
infosMap := make(map[string]fs.DirEntry)
for _, info := range dirInfos {
infosMap[info.Name()] = info
}

var entries []agtmq.StorageCheckRespEntry
for _, obj := range msg.Packages {
fileName := utils.MakeStorageMovePackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[fileName]
dirName := utils.MakeStorageLoadPackageDirName(obj.PackageID, obj.UserID)
_, ok := infosMap[dirName]

if ok {
// 不需要做处理
// 删除map中的记录,表示此记录已被检查过
delete(infosMap, fileName)
delete(infosMap, dirName)

} else {
// 只要文件不存在,就删除StoragePackage表中的记录
@@ -152,7 +158,15 @@ func (svc *Service) checkStorageComplete(msg *agtmq.StorageCheck, fileInfos []fs
}

func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePackage) (*agtmq.StartStorageCreatePackageResp, *mq.CodeMessage) {
getStgResp, err := svc.coordinator.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "new coordinator client failed")
}
defer coorCli.Close()

getStgResp, err := coorCli.GetStorageInfo(coormq.NewGetStorageInfo(msg.UserID, msg.StorageID))
if err != nil {
logger.WithField("StorageID", msg.StorageID).
Warnf("getting storage info: %s", err.Error())
@@ -160,7 +174,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get storage info failed")
}

fullPath := filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path)
fullPath := filepath.Clean(filepath.Join(config.Cfg().StorageBaseDir, getStgResp.Directory, msg.Path))

var uploadFilePathes []string
err = filepath.WalkDir(fullPath, func(fname string, fi os.DirEntry, err error) error {
@@ -180,7 +194,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "read directory failed")
}

objIter := myos.NewUploadingObjectIterator(fullPath, uploadFilePathes)
objIter := iterator.NewUploadingObjectIterator(fullPath, uploadFilePathes)

if msg.Redundancy.Type == models.RedundancyRep {
repInfo, err := msg.Redundancy.ToRepInfo()
@@ -190,14 +204,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get rep redundancy info failed")
}

tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](
stgcmd.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo, stgcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: &config.Cfg().ID,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.taskManager.StartNew(mytask.NewCreateRepPackage(msg.UserID, msg.BucketID, msg.Name, objIter, repInfo))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
}

@@ -208,14 +215,7 @@ func (svc *Service) StartStorageCreatePackage(msg *agtmq.StartStorageCreatePacka
return nil, mq.Failed(errorcode.OperationFailed, "get ec redundancy info failed")
}

tsk := svc.taskManager.StartNew(stgcmd.Wrap[mytask.TaskContext](
stgcmd.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo, config.Cfg().ECPacketSize, stgcmd.UploadConfig{
LocalIPFS: svc.ipfs,
LocalNodeID: &config.Cfg().ID,
ExternalIP: config.Cfg().ExternalIP,
GRPCPort: config.Cfg().GRPCPort,
MQ: &config.Cfg().RabbitMQ,
})))
tsk := svc.taskManager.StartNew(mytask.NewCreateECPackage(msg.UserID, msg.BucketID, msg.Name, objIter, ecInfo))
return mq.ReplyOK(agtmq.NewStartStorageCreatePackageResp(tsk.ID()))
}

@@ -231,17 +231,16 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0))
}

wrapTask := tsk.Body().(*stgcmd.TaskWrapper[mytask.TaskContext])

if tsk.Error() != nil {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, tsk.Error().Error(), 0))
}

if repTask, ok := wrapTask.InnerTask().(*stgcmd.CreateRepPackage); ok {
// TODO 避免判断类型
if repTask, ok := tsk.Body().(*mytask.CreateRepPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", repTask.Result.PackageID))
}

if ecTask, ok := wrapTask.InnerTask().(*stgcmd.CreateECPackage); ok {
if ecTask, ok := tsk.Body().(*mytask.CreateECPackage); ok {
return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(true, "", ecTask.Result.PackageID))
}


+ 102
- 0
internal/task/cache_move_package.go View File

@@ -0,0 +1,102 @@
package task

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type CacheMovePackage struct {
userID int64
packageID int64
}

func NewCacheMovePackage(userID int64, packageID int64) *CacheMovePackage {
return &CacheMovePackage{
userID: userID,
packageID: packageID,
}
}

func (t *CacheMovePackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.do(ctx)
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

func (t *CacheMovePackage) do(ctx TaskContext) error {
log := logger.WithType[CacheMovePackage]("Task")
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

// TOOD EC的锁
mutex, err := reqbuilder.NewBuilder().
Metadata().
// 读取Package信息和包含的Object信息
Package().ReadOne(t.packageID).Object().ReadAny().
// 读取Rep对象的配置
ObjectRep().ReadAny().
// 创建Cache记录
Cache().CreateAny().
IPFS().
// pin文件
CreateAnyRep(*globals.Local.NodeID).
MutexLock(ctx.distlock)
if err != nil {
return fmt.Errorf("acquiring distlock: %w", err)
}
defer mutex.Unlock()

coorCli, err := globals.CoordinatorMQPool.Acquire()
if err != nil {
return fmt.Errorf("new coordinator client: %w", err)
}
defer coorCli.Close()

pkgResp, err := coorCli.GetPackage(coormq.NewGetPackage(t.userID, t.packageID))
if err != nil {
return fmt.Errorf("getting package: %w", err)
}

if pkgResp.Redundancy.IsRepInfo() {
return t.moveRep(ctx, coorCli, pkgResp.Package)
} else {
// TODO EC的CacheMove逻辑
}

return nil
}
func (t *CacheMovePackage) moveRep(ctx TaskContext, coorCli *coormq.PoolClient, pkg model.Package) error {
getRepResp, err := coorCli.GetPackageObjectRepData(coormq.NewGetPackageObjectRepData(pkg.PackageID))
if err != nil {
return fmt.Errorf("getting package object rep data: %w", err)
}

ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
return fmt.Errorf("new ipfs client: %w", err)
}
defer ipfsCli.Close()

var fileHashes []string
for _, rep := range getRepResp.Data {
if err := ipfsCli.Pin(rep.FileHash); err != nil {
return fmt.Errorf("pinning file %s: %w", rep.FileHash, err)
}

fileHashes = append(fileHashes, rep.FileHash)
}

_, err = coorCli.CachePackageMoved(coormq.NewCachePackageMoved(pkg.PackageID, *globals.Local.NodeID, fileHashes))
if err != nil {
return fmt.Errorf("reporting cache package moved: %w", err)
}

return nil
}

+ 43
- 0
internal/task/create_ec_package.go View File

@@ -0,0 +1,43 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateECPackageResult = cmd.CreateECPackageResult

type CreateECPackage struct {
cmd cmd.CreateECPackage

Result *CreateECPackageResult
}

func NewCreateECPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.ECRedundancyInfo) *CreateECPackage {
return &CreateECPackage{
cmd: *cmd.NewCreateECPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateECPackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateECPackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")

ret, err := t.cmd.Execute(&cmd.UpdateECPackageContext{
UpdatePackageContext: &cmd.UpdatePackageContext{
Distlock: ctx.distlock,
},
ECPacketSize: config.Cfg().ECPacketSize,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 39
- 0
internal/task/create_rep_package.go View File

@@ -0,0 +1,39 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
"gitlink.org.cn/cloudream/storage-common/pkgs/iterator"
)

type CreateRepPackageResult = cmd.CreateRepPackageResult

type CreateRepPackage struct {
cmd cmd.CreateRepPackage

Result *CreateRepPackageResult
}

func NewCreateRepPackage(userID int64, bucketID int64, name string, objIter iterator.UploadingObjectIterator, redundancy models.RepRedundancyInfo) *CreateRepPackage {
return &CreateRepPackage{
cmd: *cmd.NewCreateRepPackage(userID, bucketID, name, objIter, redundancy),
}
}

func (t *CreateRepPackage) Execute(ctx TaskContext, complete CompleteFn) {
log := logger.WithType[CreateRepPackage]("Task")
log.Debugf("begin")
defer log.Debugf("end")

ret, err := t.cmd.Execute(&cmd.UpdatePackageContext{
Distlock: ctx.distlock,
})
t.Result = ret

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 26
- 0
internal/task/download_package.go View File

@@ -0,0 +1,26 @@
package task

import (
"time"

"gitlink.org.cn/cloudream/storage-common/pkgs/cmd"
)

type DownloadPackage struct {
cmd *cmd.DownloadPackage
}

func NewDownloadPackage(userID int64, packageID int64, outputPath string) *DownloadPackage {
return &DownloadPackage{
cmd: cmd.NewDownloadPackage(userID, packageID, outputPath),
}
}
func (t *DownloadPackage) Execute(ctx TaskContext, complete CompleteFn) {
err := t.cmd.Execute(&cmd.DownloadPackageContext{
Distlock: ctx.distlock,
})

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}

+ 14
- 1
internal/task/ipfs_pin.go View File

@@ -5,6 +5,7 @@ import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
)

type IPFSPin struct {
@@ -31,7 +32,19 @@ func (t *IPFSPin) Execute(ctx TaskContext, complete CompleteFn) {
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

err := ctx.ipfs.Pin(t.FileHash)
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
err := fmt.Errorf("new ipfs client: %w", err)
log.Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer ipfsCli.Close()

err = ipfsCli.Pin(t.FileHash)
if err != nil {
err := fmt.Errorf("pin file failed, err: %w", err)
log.WithField("FileHash", t.FileHash).Warn(err.Error())


+ 14
- 1
internal/task/ipfs_read.go View File

@@ -8,6 +8,7 @@ import (
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-common/globals"
)

type IPFSRead struct {
@@ -61,7 +62,19 @@ func (t *IPFSRead) Execute(ctx TaskContext, complete CompleteFn) {
}
defer outputFile.Close()

rd, err := ctx.ipfs.OpenRead(t.FileHash)
ipfsCli, err := globals.IPFSPool.Acquire()
if err != nil {
err := fmt.Errorf("new ipfs client: %w", err)
log.Warn(err.Error())

complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
return
}
defer ipfsCli.Close()

rd, err := ipfsCli.OpenRead(t.FileHash)
if err != nil {
err := fmt.Errorf("read ipfs file failed, err: %w", err)
log.WithField("FileHash", t.FileHash).Warn(err.Error())


+ 3
- 9
internal/task/task.go View File

@@ -3,14 +3,10 @@ package task
import (
distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
"gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/common/utils/ipfs"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
)

type TaskContext struct {
ipfs *ipfs.IPFS
coordinator *coormq.Client
distlock *distsvc.Service
distlock *distsvc.Service
}

// 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用,
@@ -25,10 +21,8 @@ type Task = task.Task[TaskContext]

type CompleteOption = task.CompleteOption

func NewManager(ipfs *ipfs.IPFS, coorCli *coormq.Client, distLock *distsvc.Service) Manager {
func NewManager(distlock *distsvc.Service) Manager {
return task.NewManager(TaskContext{
ipfs: ipfs,
coordinator: coorCli,
distlock: distLock,
distlock: distlock,
})
}

+ 16
- 20
main.go View File

@@ -6,20 +6,19 @@ import (
"os"
"sync"

distsvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/ipfs"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-agent/internal/task"
agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto"
"gitlink.org.cn/cloudream/storage-common/globals"
"gitlink.org.cn/cloudream/storage-common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage-common/pkgs/grpc/agent"

"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"

cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd"
grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/mq"
)

// TODO 此数据是否在运行时会发生变化?
@@ -41,17 +40,14 @@ func main() {
os.Exit(1)
}

ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
}

coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
}
globals.InitLocal(&config.Cfg().Local)
globals.InitMQPool(&config.Cfg().RabbitMQ)
globals.InitAgentRPCPool(&agtrpc.PoolConfig{
Port: config.Cfg().GRPC.Port,
})
globals.InitIPFSPool(&config.Cfg().IPFS)

distlock, err := distsvc.NewService(&config.Cfg().DistLock)
distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
}
@@ -60,11 +56,11 @@ func main() {
wg := sync.WaitGroup{}
wg.Add(5)

taskMgr := task.NewManager(ipfs, coorCli, distlock)
taskMgr := task.NewManager(distlock)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(ipfs, &taskMgr, coorCli), config.Cfg().ID, &config.Cfg().RabbitMQ)
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error())
}
@@ -76,14 +72,14 @@ func main() {
go reportStatus(&wg) //网络延迟感知

//面向客户端收发数据
listenAddr := config.Cfg().GRPCListenAddress
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}

s := grpc.NewServer()
agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs))
agtrpc.RegisterAgentServer(s, grpcsvc.NewService())
go serveGRPC(s, lis, &wg)

go serveDistLock(distlock)
@@ -119,7 +115,7 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) {
wg.Done()
}

func serveDistLock(svc *distsvc.Service) {
func serveDistLock(svc *distlock.Service) {
log.Info("start serving distlock")

err := svc.Serve()


+ 3
- 4
status_report.go View File

@@ -7,13 +7,12 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage-agent/internal/config"
"gitlink.org.cn/cloudream/storage-common/consts"
coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator"
coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator"
coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage-common/utils"
)

func reportStatus(wg *sync.WaitGroup) {
coorCli, err := coorcli.NewClient(&config.Cfg().RabbitMQ)
coorCli, err := coormq.NewClient(&config.Cfg().RabbitMQ)
if err != nil {
wg.Done()
log.Error("new coordinator client failed, err: %w", err)
@@ -56,7 +55,7 @@ func reportStatus(wg *sync.WaitGroup) {

//发送心跳
// TODO 由于数据结构未定,暂时不发送真实数据
coorCli.AgentStatusReport(coormsg.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus))
coorCli.AgentStatusReport(coormq.NewAgentStatusReportBody(config.Cfg().ID, []int64{}, []int{}, ipfsStatus, localDirStatus))

time.Sleep(time.Minute * 5)
}


Loading…
Cancel
Save