Browse Source

优化目录结构

gitlink
Sydonian 1 year ago
parent
commit
87f978e123
37 changed files with 86 additions and 346 deletions
  1. +2
    -2
      agent/internal/cmd/serve.go
  2. +1
    -1
      agent/internal/grpc/io.go
  3. +4
    -4
      agent/internal/grpc/service.go
  4. +1
    -1
      agent/internal/http/hub_io.go
  5. +4
    -4
      agent/internal/http/service.go
  6. +2
    -2
      agent/internal/mq/cache.go
  7. +4
    -4
      agent/internal/mq/service.go
  8. +1
    -1
      agent/internal/task/cache_move_package.go
  9. +4
    -4
      agent/internal/task/task.go
  10. +4
    -4
      client/internal/task/task.go
  11. +5
    -5
      client/main.go
  12. +4
    -4
      common/pkgs/downloader/downloader.go
  13. +1
    -1
      common/pkgs/downloader/iterator.go
  14. +1
    -1
      common/pkgs/downloader/lrc_strip_iterator.go
  15. +1
    -1
      common/pkgs/downloader/strip_iterator.go
  16. +3
    -3
      common/pkgs/ioswitch2/ops2/bypass.go
  17. +4
    -4
      common/pkgs/ioswitch2/ops2/faas.go
  18. +5
    -5
      common/pkgs/ioswitch2/ops2/shard_store.go
  19. +3
    -3
      common/pkgs/ioswitch2/ops2/shared_store.go
  20. +5
    -5
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  21. +1
    -1
      common/pkgs/storage/agtpool/pool.go
  22. +0
    -80
      common/pkgs/storage/cos/multiPartUploader.go
  23. +6
    -6
      common/pkgs/storage/local/agent.go
  24. +1
    -1
      common/pkgs/storage/local/local.go
  25. +2
    -2
      common/pkgs/storage/local/shard_store.go
  26. +2
    -2
      common/pkgs/storage/local/shared_store.go
  27. +0
    -1
      common/pkgs/storage/obs/faas.go
  28. +0
    -90
      common/pkgs/storage/obs/multiPartUploader.go
  29. +0
    -1
      common/pkgs/storage/obs/obs.go
  30. +0
    -88
      common/pkgs/storage/oss/multiPartUploader.go
  31. +0
    -0
      common/pkgs/storage/s3/agent.go
  32. +2
    -2
      common/pkgs/storage/s3/s3_test.go
  33. +1
    -1
      common/pkgs/uploader/create_load.go
  34. +1
    -1
      common/pkgs/uploader/update.go
  35. +4
    -4
      common/pkgs/uploader/uploader.go
  36. +4
    -4
      scanner/internal/event/event.go
  37. +3
    -3
      scanner/main.go

+ 2
- 2
agent/internal/cmd/serve.go View File

@@ -22,7 +22,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"

"google.golang.org/grpc"
@@ -55,7 +55,7 @@ func serve(configPath string) {
hubCfg := downloadHubConfig()

// 初始化存储服务管理器
stgAgts := svcmgr.NewPool()
stgAgts := agtpool.NewPool()
for _, stg := range hubCfg.Storages {
err := stgAgts.SetupAgent(stg)
if err != nil {


+ 1
- 1
agent/internal/grpc/io.go View File

@@ -29,7 +29,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe
defer s.swWorker.Remove(sw)

execCtx := exec.NewWithContext(ctx)
exec.SetValueByType(execCtx, s.stgMgr)
exec.SetValueByType(execCtx, s.stgAgts)
_, err = sw.Run(execCtx)
if err != nil {
log.Warnf("running io plan: %v", err)


+ 4
- 4
agent/internal/grpc/service.go View File

@@ -3,18 +3,18 @@ package grpc
import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

type Service struct {
agentserver.AgentServer
swWorker *exec.Worker
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
}

func NewService(swWorker *exec.Worker, stgMgr *svcmgr.AgentPool) *Service {
func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,
stgAgts: stgAgts,
}
}

+ 1
- 1
agent/internal/http/hub_io.go View File

@@ -162,7 +162,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) {
defer s.svc.swWorker.Remove(sw)

execCtx := exec.NewWithContext(ctx.Request.Context())
exec.SetValueByType(execCtx, s.svc.stgMgr)
exec.SetValueByType(execCtx, s.svc.stgAgts)
_, err = sw.Run(execCtx)
if err != nil {
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err)))


+ 4
- 4
agent/internal/http/service.go View File

@@ -2,17 +2,17 @@ package http

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

type Service struct {
swWorker *exec.Worker
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
}

func NewService(swWorker *exec.Worker, stgMgr *svcmgr.AgentPool) *Service {
func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,
stgAgts: stgAgts,
}
}

+ 2
- 2
agent/internal/mq/cache.go View File

@@ -12,7 +12,7 @@ import (
)

func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
store, err := svc.stgMgr.GetShardStore(msg.StorageID)
store, err := svc.stgAgts.GetShardStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err))
}
@@ -31,7 +31,7 @@ func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *m
}

func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) {
store, err := svc.stgMgr.GetShardStore(msg.StorageID)
store, err := svc.stgAgts.GetShardStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err))
}


+ 4
- 4
agent/internal/mq/service.go View File

@@ -2,17 +2,17 @@ package mq

import (
"gitlink.org.cn/cloudream/storage/agent/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

type Service struct {
taskManager *task.Manager
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
}

func NewService(taskMgr *task.Manager, stgMgr *svcmgr.AgentPool) *Service {
func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool) *Service {
return &Service{
taskManager: taskMgr,
stgMgr: stgMgr,
stgAgts: stgAgts,
}
}

+ 1
- 1
agent/internal/task/cache_move_package.go View File

@@ -39,7 +39,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

store, err := ctx.stgMgr.GetShardStore(t.storageID)
store, err := ctx.stgAgts.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}


+ 4
- 4
agent/internal/task/task.go View File

@@ -6,7 +6,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

@@ -16,7 +16,7 @@ type TaskContext struct {
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
uploader *uploader.Uploader
}

@@ -35,13 +35,13 @@ type Task = task.Task[TaskContext]
// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *svcmgr.AgentPool, uploader *uploader.Uploader) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgAgts *agtpool.AgentPool, uploader *uploader.Uploader) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
stgMgr: stgMgr,
stgAgts: stgAgts,
uploader: uploader,
})
}

+ 4
- 4
client/internal/task/task.go View File

@@ -4,14 +4,14 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务
"gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" // 引入网络连接状态收集器
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务和网络连接状态收集器
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
}

// CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果
@@ -31,10 +31,10 @@ type CompleteOption = task.CompleteOption

// NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数
// 返回一个初始化好的任务管理器实例
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.AgentPool) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
stgMgr: stgMgr,
stgAgts: stgAgts,
})
}

+ 5
- 5
client/main.go View File

@@ -21,7 +21,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
"gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

@@ -99,18 +99,18 @@ func main() {
go serveAccessStat(acStat)

// 存储管理器
stgMgr := svcmgr.NewPool()
stgAgts := agtpool.NewPool()

// 任务管理器
taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr)
taskMgr := task.NewManager(distlockSvc, &conCol, stgAgts)

strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)

// 下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr, strgSel)
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel)

// 上传器
uploader := uploader.NewUploader(distlockSvc, &conCol, stgMgr, stgMeta)
uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta)

svc, err := services.NewService(distlockSvc, &taskMgr, &dlder, acStat, uploader, strgSel, stgMeta)
if err != nil {


+ 4
- 4
common/pkgs/downloader/downloader.go View File

@@ -12,7 +12,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

const (
@@ -42,11 +42,11 @@ type Downloader struct {
strips *StripCache
cfg Config
conn *connectivity.Collector
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
selector *strategy.Selector
}

func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.AgentPool, sel *strategy.Selector) Downloader {
func NewDownloader(cfg Config, conn *connectivity.Collector, stgAgts *agtpool.AgentPool, sel *strategy.Selector) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}
@@ -56,7 +56,7 @@ func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.Agen
strips: ch,
cfg: cfg,
conn: conn,
stgMgr: stgMgr,
stgAgts: stgAgts,
selector: sel,
}
}


+ 1
- 1
common/pkgs/downloader/iterator.go View File

@@ -136,7 +136,7 @@ func (i *DownloadObjectIterator) downloadDirect(req downloadReqeust2, strg strat
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, i.downloader.stgMgr)
exec.SetValueByType(exeCtx, i.downloader.stgAgts)
exec := plans.Execute(exeCtx)
go exec.Wait(context.TODO())



+ 1
- 1
common/pkgs/downloader/lrc_strip_iterator.go View File

@@ -114,7 +114,7 @@ func (s *LRCStripIterator) downloading() {
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, s.downloder.stgMgr)
exec.SetValueByType(exeCtx, s.downloder.stgAgts)

exec := plans.Execute(exeCtx)



+ 1
- 1
common/pkgs/downloader/strip_iterator.go View File

@@ -218,7 +218,7 @@ func (s *StripIterator) readStrip(stripIndex int64, buf []byte) (int, error) {
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, s.downloader.stgMgr)
exec.SetValueByType(exeCtx, s.downloader.stgAgts)
exec := plans.Execute(exeCtx)

ctx, cancel := context.WithCancel(context.Background())


+ 3
- 3
common/pkgs/ioswitch2/ops2/bypass.go View File

@@ -6,7 +6,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -44,12 +44,12 @@ type BypassToShardStore struct {
}

func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
svcMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return err
}

shardStore, err := svcMgr.GetShardStore(o.StorageID)
shardStore, err := stgAgts.GetShardStore(o.StorageID)
if err != nil {
return err
}


+ 4
- 4
common/pkgs/ioswitch2/ops2/faas.go View File

@@ -7,7 +7,7 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -20,17 +20,17 @@ type InternalFaaSGalMultiply struct {
}

func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return err
}

fass, err := svcmgr.GetComponent[types.InternalFaaSCall](stgMgr, o.StorageID)
fass, err := agtpool.GetComponent[types.InternalFaaSCall](stgAgts, o.StorageID)
if err != nil {
return fmt.Errorf("getting faas component: %w", err)
}

tmp, err := svcmgr.GetComponent[types.TempStore](stgMgr, o.StorageID)
tmp, err := agtpool.GetComponent[types.TempStore](stgAgts, o.StorageID)
if err != nil {
return fmt.Errorf("getting temp store component: %w", err)
}


+ 5
- 5
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -12,7 +12,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/io2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -42,12 +42,12 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}

store, err := stgMgr.GetShardStore(o.StorageID)
store, err := stgAgts.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}
@@ -84,12 +84,12 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}

store, err := stgMgr.GetShardStore(o.StorageID)
store, err := stgAgts.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}


+ 3
- 3
common/pkgs/ioswitch2/ops2/shared_store.go View File

@@ -8,7 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

func init() {
@@ -27,12 +27,12 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("load file to shared store")
defer logger.Debugf("load file to shared store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}

store, err := stgMgr.GetSharedStore(o.StorageID)
store, err := stgAgts.GetSharedStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}


+ 5
- 5
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -11,7 +11,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -41,12 +41,12 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}

store, err := stgMgr.GetShardStore(o.StorageID)
store, err := stgAgts.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}
@@ -83,12 +83,12 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
stgAgts, err := exec.GetValueByType[*agtpool.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}

store, err := stgMgr.GetShardStore(o.StorageID)
store, err := stgAgts.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}


common/pkgs/storage/svcmgr/mgr.go → common/pkgs/storage/agtpool/pool.go View File

@@ -1,4 +1,4 @@
package svcmgr
package agtpool

import (
"fmt"

+ 0
- 80
common/pkgs/storage/cos/multiPartUploader.go View File

@@ -1,80 +0,0 @@
package cos

import (
"context"
"fmt"
"io"
"net/http"
"net/url"

"github.com/tencentyun/cos-go-sdk-v5"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type MultiPartUploader struct {
client *cos.Client
}

func NewMultiPartUpload(address *cdssdk.COSType) *MultiPartUploader {
// cos的endpoint已包含bucket名,会自动将桶解析出来
u, _ := url.Parse(address.Endpoint)
b := &cos.BaseURL{BucketURL: u}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: address.AK,
SecretKey: address.SK,
},
})

return &MultiPartUploader{
client: client,
}
}

func (c *MultiPartUploader) Initiate(objectName string) (string, error) {
v, _, err := c.client.Object.InitiateMultipartUpload(context.Background(), objectName, nil)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return v.UploadID, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) {
resp, err := c.client.Object.UploadPart(
context.Background(), key, uploadID, partNumber, stream, nil,
)
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}

result := &types.UploadedPartInfo{
ETag: resp.Header.Get("ETag"),
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) Complete(uploadID string, key string, parts []*types.UploadedPartInfo) error {
opt := &cos.CompleteMultipartUploadOptions{}
for i := 0; i < len(parts); i++ {
opt.Parts = append(opt.Parts, cos.Object{
PartNumber: parts[i].PartNumber, ETag: parts[i].ETag},
)
}
_, _, err := c.client.Object.CompleteMultipartUpload(
context.Background(), key, uploadID, opt,
)
if err != nil {
return err
}

return nil
}
func (c *MultiPartUploader) Abort() {

}

func (c *MultiPartUploader) Close() {

}

common/pkgs/storage/local/service.go → common/pkgs/storage/local/agent.go View File

@@ -5,13 +5,13 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Agent struct {
type agent struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
SharedStore *SharedStore
}

func (s *Agent) Start(ch *types.StorageEventChan) {
func (s *agent) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}
@@ -21,7 +21,7 @@ func (s *Agent) Start(ch *types.StorageEventChan) {
}
}

func (s *Agent) Stop() {
func (s *agent) Stop() {
if s.ShardStore != nil {
s.ShardStore.Stop()
}
@@ -31,11 +31,11 @@ func (s *Agent) Stop() {
}
}

func (s *Agent) Info() stgmod.StorageDetail {
func (s *agent) Info() stgmod.StorageDetail {
return s.Detail
}

func (a *Agent) GetShardStore() (types.ShardStore, error) {
func (a *agent) GetShardStore() (types.ShardStore, error) {
if a.ShardStore == nil {
return nil, types.ErrUnsupported
}
@@ -43,7 +43,7 @@ func (a *Agent) GetShardStore() (types.ShardStore, error) {
return a.ShardStore, nil
}

func (a *Agent) GetSharedStore() (types.SharedStore, error) {
func (a *agent) GetSharedStore() (types.SharedStore, error) {
if a.SharedStore == nil {
return nil, types.ErrUnsupported
}

+ 1
- 1
common/pkgs/storage/local/local.go View File

@@ -18,7 +18,7 @@ func init() {
type builder struct{}

func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, error) {
agt := &Agent{
agt := &agent{
Detail: detail,
}



+ 2
- 2
common/pkgs/storage/local/shard_store.go View File

@@ -23,7 +23,7 @@ const (
)

type ShardStore struct {
agt *Agent
agt *agent
cfg cdssdk.LocalShardStorage
absRoot string
lock sync.Mutex
@@ -31,7 +31,7 @@ type ShardStore struct {
done chan any
}

func NewShardStore(svc *Agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
func NewShardStore(svc *agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
absRoot, err := filepath.Abs(cfg.Root)
if err != nil {
return nil, fmt.Errorf("get abs root: %w", err)


+ 2
- 2
common/pkgs/storage/local/shared_store.go View File

@@ -11,11 +11,11 @@ import (
)

type SharedStore struct {
agt *Agent
agt *agent
cfg cdssdk.LocalSharedStorage
}

func NewSharedStore(agt *Agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
func NewSharedStore(agt *agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
return &SharedStore{
agt: agt,
cfg: cfg,


+ 0
- 1
common/pkgs/storage/obs/faas.go View File

@@ -1 +0,0 @@
package obs

+ 0
- 90
common/pkgs/storage/obs/multiPartUploader.go View File

@@ -1,90 +0,0 @@
package obs

import (
"fmt"
"io"

"github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type MultiPartUploader struct {
client *obs.ObsClient
bucket string
}

func NewMultiPartUpload(address *cdssdk.OBSType) *MultiPartUploader {
client, err := obs.New(address.AK, address.SK, address.Endpoint)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &MultiPartUploader{
client: client,
bucket: address.Bucket,
}
}

func (c *MultiPartUploader) Initiate(objectName string) (string, error) {
input := &obs.InitiateMultipartUploadInput{}
input.Bucket = c.bucket
input.Key = objectName
imur, err := c.client.InitiateMultipartUpload(input)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadId, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) {
uploadParam := &obs.UploadPartInput{
Bucket: c.bucket,
Key: key,
UploadId: uploadID,
PartSize: partSize,
PartNumber: partNumber,
Body: stream,
}

part, err := c.client.UploadPart(uploadParam)
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}
result := &types.UploadedPartInfo{
ETag: part.ETag,
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error {
var uploadPart []obs.Part
for i := 0; i < len(parts); i++ {
uploadPart = append(uploadPart, obs.Part{
PartNumber: parts[i].PartNumber,
ETag: parts[i].ETag,
})
}

notifyParam := &obs.CompleteMultipartUploadInput{
Bucket: c.bucket,
Key: Key,
UploadId: uploadID,
Parts: uploadPart,
}

_, err := c.client.CompleteMultipartUpload(notifyParam)
if err != nil {
return err
}
return nil
}
func (c *MultiPartUploader) Abort() {

}

func (c *MultiPartUploader) Close() {

}

+ 0
- 1
common/pkgs/storage/obs/obs.go View File

@@ -1 +0,0 @@
package obs

+ 0
- 88
common/pkgs/storage/oss/multiPartUploader.go View File

@@ -1,88 +0,0 @@
package oss

import (
"fmt"
"io"
"log"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type MultiPartUploader struct {
client *oss.Client
bucket *oss.Bucket
}

func NewMultiPartUpload(address *cdssdk.OSSType) *MultiPartUploader {
// 创建OSSClient实例。
client, err := oss.New(address.Endpoint, address.AK, address.SK)
if err != nil {
log.Fatalf("Error: %v", err)
}

bucket, err := client.Bucket(address.Bucket)
if err != nil {
log.Fatalf("Error: %v", err)
}

return &MultiPartUploader{
client: client,
bucket: bucket,
}
}

func (c *MultiPartUploader) Initiate(objectName string) (string, error) {
imur, err := c.bucket.InitiateMultipartUpload(objectName)
if err != nil {
return "", fmt.Errorf("failed to initiate multipart upload: %w", err)
}
return imur.UploadID, nil
}

func (c *MultiPartUploader) UploadPart(uploadID string, key string, partSize int64, partNumber int, stream io.Reader) (*types.UploadedPartInfo, error) {
uploadParam := oss.InitiateMultipartUploadResult{
UploadID: uploadID,
Key: key,
Bucket: c.bucket.BucketName,
}
part, err := c.bucket.UploadPart(uploadParam, stream, partSize, partNumber)
if err != nil {
return nil, fmt.Errorf("failed to upload part: %w", err)
}
result := &types.UploadedPartInfo{
ETag: part.ETag,
PartNumber: partNumber,
}
return result, nil
}

func (c *MultiPartUploader) Complete(uploadID string, Key string, parts []*types.UploadedPartInfo) error {
notifyParam := oss.InitiateMultipartUploadResult{
UploadID: uploadID,
Key: Key,
Bucket: c.bucket.BucketName,
}
var uploadPart []oss.UploadPart
for i := 0; i < len(parts); i++ {
uploadPart = append(uploadPart, oss.UploadPart{
PartNumber: parts[i].PartNumber,
ETag: parts[i].ETag,
})
}
_, err := c.bucket.CompleteMultipartUpload(notifyParam, uploadPart)
if err != nil {
return err
}
return nil
}

func (c *MultiPartUploader) Abort() {

}

func (c *MultiPartUploader) Close() {
// 关闭client

}

common/pkgs/storage/s3/service.go → common/pkgs/storage/s3/agent.go View File


+ 2
- 2
common/pkgs/storage/s3/s3_test.go View File

@@ -19,8 +19,8 @@ func Test_S3(t *testing.T) {
Convey("OBS", t, func() {
cli, bkt, err := createS3Client(&cdssdk.OBSType{
Region: "cn-north-4",
AK: "CANMDYKXIWRDR0IYDB32",
SK: "V67yEYpu7ol2NT8nhLlNF1g9k5hq2VwIP5N5jIoQ",
AK: "*",
SK: "*",
Endpoint: "https://obs.cn-north-4.myhuaweicloud.com",
Bucket: "pcm3-bucket3",
})


+ 1
- 1
common/pkgs/uploader/create_load.go View File

@@ -56,7 +56,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, u.uploader.stgMgr)
exec.SetValueByType(exeCtx, u.uploader.stgAgts)
exec := plans.Execute(exeCtx)
exec.BeginWrite(io.NopCloser(stream), hd)
ret, err := exec.Wait(context.TODO())


+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -61,7 +61,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error
}

exeCtx := exec.NewExecContext()
exec.SetValueByType(exeCtx, w.uploader.stgMgr)
exec.SetValueByType(exeCtx, w.uploader.stgAgts)
exec := plans.Execute(exeCtx)
exec.BeginWrite(io.NopCloser(stream), hd)
ret, err := exec.Wait(context.TODO())


+ 4
- 4
common/pkgs/uploader/uploader.go View File

@@ -16,23 +16,23 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

type Uploader struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *svcmgr.AgentPool
stgAgts *agtpool.AgentPool
stgMeta *metacache.StorageMeta
loadTo []cdssdk.StorageID
loadToPath []string
}

func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.AgentPool, stgMeta *metacache.StorageMeta) *Uploader {
func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgAgts *agtpool.AgentPool, stgMeta *metacache.StorageMeta) *Uploader {
return &Uploader{
distlock: distlock,
connectivity: connectivity,
stgMgr: stgMgr,
stgAgts: stgAgts,
stgMeta: stgMeta,
}
}


+ 4
- 4
scanner/internal/event/event.go View File

@@ -9,13 +9,13 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/typedispatcher"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
)

type ExecuteArgs struct {
DB *db2.DB
DistLock *distlock.Service
StgMgr *svcmgr.AgentPool
StgMgr *agtpool.AgentPool
}

type Executor = event.Executor[ExecuteArgs]
@@ -26,11 +26,11 @@ type Event = event.Event[ExecuteArgs]

type ExecuteOption = event.ExecuteOption

func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *svcmgr.AgentPool) Executor {
func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.AgentPool) Executor {
return event.NewExecutor(ExecuteArgs{
DB: db,
DistLock: distLock,
StgMgr: stgMgr,
StgMgr: stgAgts,
})
}



+ 3
- 3
scanner/main.go View File

@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/scanner/internal/config"
"gitlink.org.cn/cloudream/storage/scanner/internal/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/mq"
@@ -48,10 +48,10 @@ func main() {
go serveDistLock(distlockSvc)

// 启动存储服务管理器
stgMgr := svcmgr.NewPool()
stgAgts := agtpool.NewPool()

// 启动事件执行器
eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr)
eventExecutor := event.NewExecutor(db, distlockSvc, stgAgts)
go serveEventExecutor(&eventExecutor)

agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ)


Loading…
Cancel
Save