Browse Source

优化storagefactory相关数据结构

gitlink
Sydonian 1 year ago
parent
commit
6adec36573
29 changed files with 234 additions and 249 deletions
  1. +8
    -8
      agent/internal/cmd/serve.go
  2. +2
    -2
      agent/internal/grpc/service.go
  3. +2
    -2
      agent/internal/http/service.go
  4. +2
    -2
      agent/internal/mq/service.go
  5. +2
    -2
      agent/internal/task/task.go
  6. +2
    -2
      client/internal/task/task.go
  7. +1
    -1
      client/main.go
  8. +2
    -2
      common/pkgs/downloader/downloader.go
  9. +1
    -1
      common/pkgs/ioswitch2/ops2/bypass.go
  10. +3
    -1
      common/pkgs/ioswitch2/ops2/faas.go
  11. +12
    -2
      common/pkgs/ioswitch2/ops2/multipart.go
  12. +2
    -2
      common/pkgs/ioswitch2/ops2/shard_store.go
  13. +1
    -1
      common/pkgs/ioswitch2/ops2/shared_store.go
  14. +2
    -2
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  15. +2
    -32
      common/pkgs/storage/factory/factory.go
  16. +2
    -5
      common/pkgs/storage/factory/reg/reg.go
  17. +28
    -32
      common/pkgs/storage/local/local.go
  18. +23
    -29
      common/pkgs/storage/local/service.go
  19. +4
    -4
      common/pkgs/storage/local/shard_store.go
  20. +4
    -4
      common/pkgs/storage/local/shared_store.go
  21. +37
    -41
      common/pkgs/storage/s3/s3.go
  22. +19
    -23
      common/pkgs/storage/s3/service.go
  23. +2
    -2
      common/pkgs/storage/s3/shard_store.go
  24. +36
    -27
      common/pkgs/storage/svcmgr/mgr.go
  25. +7
    -0
      common/pkgs/storage/types/s2s.go
  26. +23
    -15
      common/pkgs/storage/types/types.go
  27. +2
    -2
      common/pkgs/uploader/uploader.go
  28. +2
    -2
      scanner/internal/event/event.go
  29. +1
    -1
      scanner/main.go

+ 8
- 8
agent/internal/cmd/serve.go View File

@@ -55,9 +55,9 @@ func serve(configPath string) {
hubCfg := downloadHubConfig()

// 初始化存储服务管理器
stgMgr := svcmgr.NewManager()
stgAgts := svcmgr.NewPool()
for _, stg := range hubCfg.Storages {
err := stgMgr.CreateService(stg)
err := stgAgts.SetupAgent(stg)
if err != nil {
fmt.Printf("init storage %v: %v", stg.Storage.String(), err)
os.Exit(1)
@@ -68,7 +68,7 @@ func serve(configPath string) {
worker := exec.NewWorker()

// 初始化HTTP服务
httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgMgr))
httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgAgts))
if err != nil {
logger.Fatalf("new http server failed, err: %s", err.Error())
}
@@ -133,17 +133,17 @@ func serve(configPath string) {
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)

// 初始化下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgMgr, strgSel)
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel)

// 初始化上传器
uploader := uploader.NewUploader(distlock, &conCol, stgMgr, stgMeta)
uploader := uploader.NewUploader(distlock, &conCol, stgAgts, stgMeta)

// 初始化任务管理器
taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr, uploader)
taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgAgts, uploader)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgMgr), config.Cfg().ID, config.Cfg().RabbitMQ)
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts), config.Cfg().ID, config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new agent server failed, err: %s", err.Error())
}
@@ -159,7 +159,7 @@ func serve(configPath string) {
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgMgr))
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgAgts))
go serveGRPC(s, lis)

go serveDistLock(distlock)


+ 2
- 2
agent/internal/grpc/service.go View File

@@ -9,10 +9,10 @@ import (
type Service struct {
agentserver.AgentServer
swWorker *exec.Worker
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
}

func NewService(swWorker *exec.Worker, stgMgr *svcmgr.Manager) *Service {
func NewService(swWorker *exec.Worker, stgMgr *svcmgr.AgentPool) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,


+ 2
- 2
agent/internal/http/service.go View File

@@ -7,10 +7,10 @@ import (

type Service struct {
swWorker *exec.Worker
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
}

func NewService(swWorker *exec.Worker, stgMgr *svcmgr.Manager) *Service {
func NewService(swWorker *exec.Worker, stgMgr *svcmgr.AgentPool) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,


+ 2
- 2
agent/internal/mq/service.go View File

@@ -7,10 +7,10 @@ import (

type Service struct {
taskManager *task.Manager
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
}

func NewService(taskMgr *task.Manager, stgMgr *svcmgr.Manager) *Service {
func NewService(taskMgr *task.Manager, stgMgr *svcmgr.AgentPool) *Service {
return &Service{
taskManager: taskMgr,
stgMgr: stgMgr,


+ 2
- 2
agent/internal/task/task.go View File

@@ -16,7 +16,7 @@ type TaskContext struct {
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
uploader *uploader.Uploader
}

@@ -35,7 +35,7 @@ type Task = task.Task[TaskContext]
// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *svcmgr.Manager, uploader *uploader.Uploader) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *svcmgr.AgentPool, uploader *uploader.Uploader) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,


+ 2
- 2
client/internal/task/task.go View File

@@ -11,7 +11,7 @@ import (
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
}

// CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果
@@ -31,7 +31,7 @@ type CompleteOption = task.CompleteOption

// NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数
// 返回一个初始化好的任务管理器实例
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.Manager) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.AgentPool) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,


+ 1
- 1
client/main.go View File

@@ -99,7 +99,7 @@ func main() {
go serveAccessStat(acStat)

// 存储管理器
stgMgr := svcmgr.NewManager()
stgMgr := svcmgr.NewPool()

// 任务管理器
taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr)


+ 2
- 2
common/pkgs/downloader/downloader.go View File

@@ -42,11 +42,11 @@ type Downloader struct {
strips *StripCache
cfg Config
conn *connectivity.Collector
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
selector *strategy.Selector
}

func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.Manager, sel *strategy.Selector) Downloader {
func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.AgentPool, sel *strategy.Selector) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}


+ 1
- 1
common/pkgs/ioswitch2/ops2/bypass.go View File

@@ -44,7 +44,7 @@ type BypassToShardStore struct {
}

func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
svcMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
svcMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return err
}


+ 3
- 1
common/pkgs/ioswitch2/ops2/faas.go View File

@@ -1,5 +1,6 @@
package ops2

/*
import (
"fmt"

@@ -19,7 +20,7 @@ type InternalFaaSGalMultiply struct {
}

func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return err
}
@@ -58,3 +59,4 @@ func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executo
exec.PutArray(e, o.OutputFilePathes, outputVars)
return nil
}
*/

+ 12
- 2
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -48,7 +48,12 @@ type MultipartInitiator struct {
}

func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
initiator, err := factory.CreateComponent[types.MultipartInitiator](o.Storage)
blder := factory.GetBuilder(o.Storage)
if blder == nil {
return fmt.Errorf("unsupported storage type: %T", o.Storage.Storage.Type)
}

initiator, err := blder.CreateMultipartInitiator(o.Storage)
if err != nil {
return err
}
@@ -113,6 +118,11 @@ type MultipartUpload struct {
}

func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
blder := factory.GetBuilder(o.Storage)
if blder == nil {
return fmt.Errorf("unsupported storage type: %T", o.Storage.Storage.Type)
}

uploadArgs, err := exec.BindVar[*MultipartUploadArgsValue](e, ctx.Context, o.UploadArgs)
if err != nil {
return err
@@ -124,7 +134,7 @@ func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error
}
defer partStr.Stream.Close()

uploader, err := factory.CreateComponent[types.MultipartUploader](o.Storage)
uploader, err := blder.CreateMultipartUploader(o.Storage)
if err != nil {
return err
}


+ 2
- 2
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -42,7 +42,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}
@@ -84,7 +84,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 1
- 1
common/pkgs/ioswitch2/ops2/shared_store.go View File

@@ -27,7 +27,7 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("load file to shared store")
defer logger.Debugf("load file to shared store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 2
- 2
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -41,7 +41,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}
@@ -83,7 +83,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.AgentPool](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 2
- 32
common/pkgs/storage/factory/factory.go View File

@@ -1,10 +1,8 @@
package factory

import (
"fmt"
"reflect"

"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
@@ -14,35 +12,7 @@ import (
_ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3"
)

func CreateService(detail stgmod.StorageDetail) (types.StorageService, error) {
func GetBuilder(detail stgmod.StorageDetail) types.StorageBuilder {
typ := reflect.TypeOf(detail.Storage.Type)
bld, ok := reg.StorageBuilders[typ]
if !ok {
return nil, fmt.Errorf("unsupported storage type: %T", detail.Storage.Type)
}

return bld.CreateService(detail)
}

func CreateComponent[T any](detail stgmod.StorageDetail) (T, error) {
typ := reflect.TypeOf(detail.Storage.Type)
bld, ok := reg.StorageBuilders[typ]
if !ok {
var def T
return def, fmt.Errorf("unsupported storage type: %T", detail.Storage.Type)
}

comp, err := bld.CreateComponent(detail, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
}

c, ok := comp.(T)
if !ok {
var def T
return def, fmt.Errorf("invalid component type: %T", comp)
}

return c, nil
return reg.StorageBuilders[typ]
}

+ 2
- 5
common/pkgs/storage/factory/reg/reg.go View File

@@ -11,9 +11,6 @@ import (
var StorageBuilders = make(map[reflect.Type]types.StorageBuilder)

// 注册针对指定存储服务类型的Builder
func RegisterBuilder[T cdssdk.StorageType](createSvc types.StorageServiceBuilder, createComp types.StorageComponentBuilder) {
StorageBuilders[reflect2.TypeOf[T]()] = types.StorageBuilder{
CreateService: createSvc,
CreateComponent: createComp,
}
func RegisterBuilder[T cdssdk.StorageType](builder types.StorageBuilder) {
StorageBuilders[reflect2.TypeOf[T]()] = builder
}

+ 28
- 32
common/pkgs/storage/local/local.go View File

@@ -3,10 +3,8 @@ package local
import (
"fmt"
"path/filepath"
"reflect"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
@@ -14,11 +12,13 @@ import (
)

func init() {
reg.RegisterBuilder[*cdssdk.LocalStorageType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.LocalStorageType](&builder{})
}

func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
svc := &Service{
type builder struct{}

func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, error) {
agt := &Agent{
Detail: detail,
}

@@ -28,12 +28,12 @@ func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
return nil, fmt.Errorf("invalid shard store type %T for local storage", detail.Storage.ShardStore)
}

store, err := NewShardStore(svc, *local)
store, err := NewShardStore(agt, *local)
if err != nil {
return nil, err
}

svc.ShardStore = store
agt.ShardStore = store
}

if detail.Storage.SharedStore != nil {
@@ -42,42 +42,38 @@ func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
return nil, fmt.Errorf("invalid shared store type %T for local storage", detail.Storage.SharedStore)
}

store, err := NewSharedStore(svc, *local)
store, err := NewSharedStore(agt, *local)
if err != nil {
return nil, err
}

svc.SharedStore = store
agt.SharedStore = store
}

return svc, nil
return agt, nil
}

func createComponent(detail stgmod.StorageDetail, typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.MultipartInitiator]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

absTempDir, err := filepath.Abs(feat.TempDir)
if err != nil {
return nil, fmt.Errorf("get abs temp dir %v: %v", feat.TempDir, err)
}
func (b *builder) CreateMultipartInitiator(detail stgmod.StorageDetail) (types.MultipartInitiator, error) {
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

return &MultipartInitiator{
absTempDir: absTempDir,
}, nil
absTempDir, err := filepath.Abs(feat.TempDir)
if err != nil {
return nil, fmt.Errorf("get abs temp dir %v: %v", feat.TempDir, err)
}

case reflect2.TypeOf[types.MultipartUploader]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}
return &MultipartInitiator{
absTempDir: absTempDir,
}, nil
}

return &MultipartUploader{}, nil
func (b *builder) CreateMultipartUploader(detail stgmod.StorageDetail) (types.MultipartUploader, error) {
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

return nil, fmt.Errorf("unsupported component type %v", typ)
return &MultipartUploader{}, nil
}

+ 23
- 29
common/pkgs/storage/local/service.go View File

@@ -1,43 +1,17 @@
package local

import (
"reflect"

"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Service struct {
type Agent struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
SharedStore *SharedStore
}

func (s *Service) Info() stgmod.StorageDetail {
return s.Detail
}

func (s *Service) GetComponent(typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.ShardStore]():
if s.ShardStore == nil {
return nil, types.ErrComponentNotFound
}
return s.ShardStore, nil

case reflect2.TypeOf[types.SharedStore]():
if s.SharedStore == nil {
return nil, types.ErrComponentNotFound
}
return s.SharedStore, nil

default:
return nil, types.ErrComponentNotFound
}
}

func (s *Service) Start(ch *types.StorageEventChan) {
func (s *Agent) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}
@@ -47,7 +21,7 @@ func (s *Service) Start(ch *types.StorageEventChan) {
}
}

func (s *Service) Stop() {
func (s *Agent) Stop() {
if s.ShardStore != nil {
s.ShardStore.Stop()
}
@@ -56,3 +30,23 @@ func (s *Service) Stop() {
s.SharedStore.Stop()
}
}

func (s *Agent) Info() stgmod.StorageDetail {
return s.Detail
}

func (a *Agent) GetShardStore() (types.ShardStore, error) {
if a.ShardStore == nil {
return nil, types.ErrUnsupported
}

return a.ShardStore, nil
}

func (a *Agent) GetSharedStore() (types.SharedStore, error) {
if a.SharedStore == nil {
return nil, types.ErrUnsupported
}

return a.SharedStore, nil
}

+ 4
- 4
common/pkgs/storage/local/shard_store.go View File

@@ -23,7 +23,7 @@ const (
)

type ShardStore struct {
svc *Service
agt *Agent
cfg cdssdk.LocalShardStorage
absRoot string
lock sync.Mutex
@@ -31,14 +31,14 @@ type ShardStore struct {
done chan any
}

func NewShardStore(svc *Service, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
func NewShardStore(svc *Agent, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
absRoot, err := filepath.Abs(cfg.Root)
if err != nil {
return nil, fmt.Errorf("get abs root: %w", err)
}

return &ShardStore{
svc: svc,
agt: svc,
cfg: cfg,
absRoot: absRoot,
workingTempFiles: make(map[string]bool),
@@ -411,7 +411,7 @@ func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error {
}

func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("ShardStore", "Local").WithField("Storage", s.svc.Detail.Storage.String())
return logger.WithField("ShardStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
}

func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string {


+ 4
- 4
common/pkgs/storage/local/shared_store.go View File

@@ -11,13 +11,13 @@ import (
)

type SharedStore struct {
svc *Service
agt *Agent
cfg cdssdk.LocalSharedStorage
}

func NewSharedStore(svc *Service, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
func NewSharedStore(agt *Agent, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
return &SharedStore{
svc: svc,
agt: agt,
cfg: cfg,
}, nil
}
@@ -52,5 +52,5 @@ func (s *SharedStore) Write(objPath string, stream io.Reader) error {
}

func (s *SharedStore) getLogger() logger.Logger {
return logger.WithField("SharedStore", "Local").WithField("Storage", s.svc.Detail.Storage.String())
return logger.WithField("SharedStore", "Local").WithField("Storage", s.agt.Detail.Storage.String())
}

+ 37
- 41
common/pkgs/storage/s3/s3.go View File

@@ -2,13 +2,11 @@ package s3

import (
"fmt"
"reflect"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
@@ -16,13 +14,15 @@ import (
)

func init() {
reg.RegisterBuilder[*cdssdk.COSType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.OSSType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.OBSType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.COSType](&builder{})
reg.RegisterBuilder[*cdssdk.OSSType](&builder{})
reg.RegisterBuilder[*cdssdk.OBSType](&builder{})
}

func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
svc := &Service{
type builder struct{}

func (b *builder) CreateAgent(detail stgmod.StorageDetail) (types.StorageAgent, error) {
agt := &Agent{
Detail: detail,
}

@@ -37,7 +37,7 @@ func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
return nil, err
}

store, err := NewShardStore(svc, cli, bkt, *cfg, ShardStoreOption{
store, err := NewShardStore(agt, cli, bkt, *cfg, ShardStoreOption{
// 目前对接的存储服务都不支持从上传接口直接获取到Sha256
UseAWSSha256: false,
})
@@ -45,49 +45,45 @@ func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
return nil, err
}

svc.ShardStore = store
agt.ShardStore = store
}

return svc, nil
return agt, nil
}

func createComponent(detail stgmod.StorageDetail, typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.MultipartInitiator]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}
func (b *builder) CreateMultipartInitiator(detail stgmod.StorageDetail) (types.MultipartInitiator, error) {
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

return &MultipartInitiator{
cli: cli,
bucket: bkt,
tempDir: feat.TempDir,
}, nil
cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}

case reflect2.TypeOf[types.MultipartUploader]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}
return &MultipartInitiator{
cli: cli,
bucket: bkt,
tempDir: feat.TempDir,
}, nil
}

cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}
func (b *builder) CreateMultipartUploader(detail stgmod.StorageDetail) (types.MultipartUploader, error) {
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

return &MultipartUploader{
cli: cli,
bucket: bkt,
}, nil
cli, bkt, err := createS3Client(detail.Storage.Type)
if err != nil {
return nil, err
}

return nil, fmt.Errorf("unsupported component type %v", typ)
return &MultipartUploader{
cli: cli,
bucket: bkt,
}, nil
}

func createS3Client(addr cdssdk.StorageType) (*s3.Client, string, error) {


+ 19
- 23
common/pkgs/storage/s3/service.go View File

@@ -1,43 +1,39 @@
package s3

import (
"reflect"

"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Service struct {
type Agent struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
}

func (s *Service) Info() stgmod.StorageDetail {
return s.Detail
func (s *Agent) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}
}

func (s *Service) GetComponent(typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.ShardStore]():
if s.ShardStore == nil {
return nil, types.ErrComponentNotFound
}
return s.ShardStore, nil

default:
return nil, types.ErrComponentNotFound
func (a *Agent) Stop() {
if a.ShardStore != nil {
a.ShardStore.Stop()
}
}

func (s *Service) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}
func (a *Agent) Info() stgmod.StorageDetail {
return a.Detail
}

func (s *Service) Stop() {
if s.ShardStore != nil {
s.ShardStore.Stop()
func (a *Agent) GetShardStore() (types.ShardStore, error) {
if a.ShardStore == nil {
return nil, types.ErrUnsupported
}

return a.ShardStore, nil
}

func (a *Agent) GetSharedStore() (types.SharedStore, error) {
return nil, types.ErrUnsupported
}

+ 2
- 2
common/pkgs/storage/s3/shard_store.go View File

@@ -30,7 +30,7 @@ type ShardStoreOption struct {
}

type ShardStore struct {
svc *Service
svc *Agent
cli *s3.Client
bucket string
cfg cdssdk.S3ShardStorage
@@ -40,7 +40,7 @@ type ShardStore struct {
done chan any
}

func NewShardStore(svc *Service, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage, opt ShardStoreOption) (*ShardStore, error) {
func NewShardStore(svc *Agent, cli *s3.Client, bkt string, cfg cdssdk.S3ShardStorage, opt ShardStoreOption) (*ShardStore, error) {
return &ShardStore{
svc: svc,
cli: cli,


+ 36
- 27
common/pkgs/storage/svcmgr/mgr.go View File

@@ -1,35 +1,34 @@
package svcmgr

import (
"reflect"
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type storage struct {
Service types.StorageService
Agent types.StorageAgent
}

type Manager struct {
type AgentPool struct {
storages map[cdssdk.StorageID]*storage
lock sync.Mutex
eventChan *types.StorageEventChan
}

func NewManager() *Manager {
return &Manager{
func NewPool() *AgentPool {
return &AgentPool{
storages: make(map[cdssdk.StorageID]*storage),
eventChan: async.NewUnboundChannel[types.StorageEvent](),
}
}

func (m *Manager) CreateService(detail stgmod.StorageDetail) error {
func (m *AgentPool) SetupAgent(detail stgmod.StorageDetail) error {
m.lock.Lock()
defer m.lock.Unlock()

@@ -39,19 +38,24 @@ func (m *Manager) CreateService(detail stgmod.StorageDetail) error {

stg := &storage{}

svc, err := factory.CreateService(detail)
bld := factory.GetBuilder(detail)
if bld == nil {
return fmt.Errorf("unsupported storage type: %T", detail.Storage.Type)
}

svc, err := bld.CreateAgent(detail)
if err != nil {
return err
}

stg.Service = svc
stg.Agent = svc
m.storages[detail.Storage.StorageID] = stg

svc.Start(m.eventChan)
return nil
}

func (m *Manager) GetInfo(stgID cdssdk.StorageID) (stgmod.StorageDetail, error) {
func (m *AgentPool) GetInfo(stgID cdssdk.StorageID) (stgmod.StorageDetail, error) {
m.lock.Lock()
defer m.lock.Unlock()

@@ -60,21 +64,23 @@ func (m *Manager) GetInfo(stgID cdssdk.StorageID) (stgmod.StorageDetail, error)
return stgmod.StorageDetail{}, types.ErrStorageNotFound
}

return stg.Service.Info(), nil
return stg.Agent.Info(), nil
}

// 查找指定Storage的ShardStore组件
func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
return GetComponent[types.ShardStore](m, stgID)
}
func (m *AgentPool) GetAgent(stgID cdssdk.StorageID) (types.StorageAgent, error) {
m.lock.Lock()
defer m.lock.Unlock()

// 查找指定Storage的SharedStore组件
func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
return GetComponent[types.SharedStore](m, stgID)
stg := m.storages[stgID]
if stg == nil {
return nil, types.ErrStorageNotFound
}

return stg.Agent, nil
}

// 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件
func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (any, error) {
// 查找指定Storage的ShardStore组件
func (m *AgentPool) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

@@ -83,15 +89,18 @@ func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (any, e
return nil, types.ErrStorageNotFound
}

return stg.Service.GetComponent(typ)
return stg.Agent.GetShardStore()
}

func GetComponent[T any](mgr *Manager, stgID cdssdk.StorageID) (T, error) {
ret, err := mgr.GetComponent(stgID, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
// 查找指定Storage的SharedStore组件
func (m *AgentPool) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, types.ErrStorageNotFound
}

return ret.(T), nil
return stg.Agent.GetSharedStore()
}

+ 7
- 0
common/pkgs/storage/types/s2s.go View File

@@ -0,0 +1,7 @@
package types

import stgmod "gitlink.org.cn/cloudream/storage/common/models"

type S2STransfer interface {
Transfer(src stgmod.StorageDetail, srcPath string, dstPath string) error
}

+ 23
- 15
common/pkgs/storage/types/types.go View File

@@ -2,7 +2,6 @@ package types

import (
"errors"
"reflect"

"gitlink.org.cn/cloudream/common/pkgs/async"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
@@ -10,7 +9,8 @@ import (

var ErrStorageNotFound = errors.New("storage not found")

var ErrComponentNotFound = errors.New("component not found")
// 不支持的操作。可以作为StorageBuilder中任意函数的错误返回值,代表该操作不被支持。
var ErrUnsupported = errors.New("unsupported operation")

var ErrStorageExists = errors.New("storage already exists")

@@ -18,21 +18,29 @@ type StorageEvent interface{}

type StorageEventChan = async.UnboundChannel[StorageEvent]

// 代表一个长期运行在MasterHub上的存储服务
type StorageService interface {
Info() stgmod.StorageDetail
GetComponent(typ reflect.Type) (any, error)
/*
如果一个组件需要与Agent交互(比如实际是ShardStore功能的一部分),那么就将Create函数放到StorageAgent接口中。
如果组件十分独立,仅需要存储服务的配置信息就行,那么就把Create函数放到StorageBuilder中去。
*/

// 在MasterHub上运行,代理一个存储服务。
type StorageAgent interface {
Start(ch *StorageEventChan)
Stop()
}

// 创建一个在MasterHub上长期运行的存储服务
type StorageServiceBuilder func(detail stgmod.StorageDetail) (StorageService, error)

// 根据存储服务信息创建一个指定类型的组件
type StorageComponentBuilder func(detail stgmod.StorageDetail, typ reflect.Type) (any, error)
Info() stgmod.StorageDetail
// 获取分片存储服务
GetShardStore() (ShardStore, error)
// 获取共享存储服务
GetSharedStore() (SharedStore, error)
}

type StorageBuilder struct {
CreateService StorageServiceBuilder
CreateComponent StorageComponentBuilder
// 创建存储服务的指定组件
type StorageBuilder interface {
// 创建一个在MasterHub上长期运行的存储服务
CreateAgent(detail stgmod.StorageDetail) (StorageAgent, error)
// 创建一个分片上传功能的初始化器
CreateMultipartInitiator(detail stgmod.StorageDetail) (MultipartInitiator, error)
// 创建一个分片上传功能的上传器
CreateMultipartUploader(detail stgmod.StorageDetail) (MultipartUploader, error)
}

+ 2
- 2
common/pkgs/uploader/uploader.go View File

@@ -22,13 +22,13 @@ import (
type Uploader struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *svcmgr.Manager
stgMgr *svcmgr.AgentPool
stgMeta *metacache.StorageMeta
loadTo []cdssdk.StorageID
loadToPath []string
}

func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.Manager, stgMeta *metacache.StorageMeta) *Uploader {
func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.AgentPool, stgMeta *metacache.StorageMeta) *Uploader {
return &Uploader{
distlock: distlock,
connectivity: connectivity,


+ 2
- 2
scanner/internal/event/event.go View File

@@ -15,7 +15,7 @@ import (
type ExecuteArgs struct {
DB *db2.DB
DistLock *distlock.Service
StgMgr *svcmgr.Manager
StgMgr *svcmgr.AgentPool
}

type Executor = event.Executor[ExecuteArgs]
@@ -26,7 +26,7 @@ type Event = event.Event[ExecuteArgs]

type ExecuteOption = event.ExecuteOption

func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *svcmgr.Manager) Executor {
func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *svcmgr.AgentPool) Executor {
return event.NewExecutor(ExecuteArgs{
DB: db,
DistLock: distLock,


+ 1
- 1
scanner/main.go View File

@@ -48,7 +48,7 @@ func main() {
go serveDistLock(distlockSvc)

// 启动存储服务管理器
stgMgr := svcmgr.NewManager()
stgMgr := svcmgr.NewPool()

// 启动事件执行器
eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr)


Loading…
Cancel
Save