Browse Source

优化storage相关数据结构

gitlink
Sydonian 1 year ago
parent
commit
b1112644be
32 changed files with 330 additions and 311 deletions
  1. +3
    -3
      agent/internal/cmd/serve.go
  2. +3
    -3
      agent/internal/grpc/service.go
  3. +3
    -3
      agent/internal/http/service.go
  4. +3
    -3
      agent/internal/mq/service.go
  5. +3
    -3
      agent/internal/task/task.go
  6. +3
    -3
      client/internal/task/task.go
  7. +2
    -2
      client/main.go
  8. +3
    -3
      common/pkgs/downloader/downloader.go
  9. +4
    -4
      common/pkgs/ioswitch2/ops2/faas.go
  10. +3
    -3
      common/pkgs/ioswitch2/ops2/shard_store.go
  11. +2
    -2
      common/pkgs/ioswitch2/ops2/shared_store.go
  12. +3
    -3
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  13. +58
    -0
      common/pkgs/storage/factory/factory.go
  14. +53
    -0
      common/pkgs/storage/local/local.go
  15. +58
    -0
      common/pkgs/storage/local/service.go
  16. +4
    -9
      common/pkgs/storage/local/shard_store.go
  17. +4
    -9
      common/pkgs/storage/local/shared_store.go
  18. +0
    -10
      common/pkgs/storage/mgr/create_components.go
  19. +0
    -27
      common/pkgs/storage/mgr/create_shardstore.go
  20. +0
    -27
      common/pkgs/storage/mgr/create_sharedstore.go
  21. +0
    -172
      common/pkgs/storage/mgr/mgr.go
  22. +85
    -0
      common/pkgs/storage/svcmgr/mgr.go
  23. +0
    -7
      common/pkgs/storage/temp/temp_store.go
  24. +0
    -1
      common/pkgs/storage/types/bypass.go
  25. +0
    -1
      common/pkgs/storage/types/faas.go
  26. +2
    -1
      common/pkgs/storage/types/shard_store.go
  27. +2
    -1
      common/pkgs/storage/types/shared_store.go
  28. +0
    -1
      common/pkgs/storage/types/temp_store.go
  29. +21
    -2
      common/pkgs/storage/types/types.go
  30. +3
    -3
      common/pkgs/uploader/uploader.go
  31. +3
    -3
      scanner/internal/event/event.go
  32. +2
    -2
      scanner/main.go

+ 3
- 3
agent/internal/cmd/serve.go View File

@@ -20,7 +20,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"

"google.golang.org/grpc"
@@ -53,9 +53,9 @@ func serve(configPath string) {
hubCfg := downloadHubConfig()

// 初始化存储服务管理器
stgMgr := mgr.NewManager()
stgMgr := svcmgr.NewManager()
for _, stg := range hubCfg.Storages {
err := stgMgr.InitStorage(stg)
err := stgMgr.CreateService(stg)
if err != nil {
fmt.Printf("init storage %v: %v", stg, err)
os.Exit(1)


+ 3
- 3
agent/internal/grpc/service.go View File

@@ -3,16 +3,16 @@ package grpc
import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

type Service struct {
agentserver.AgentServer
swWorker *exec.Worker
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
}

func NewService(swWorker *exec.Worker, stgMgr *mgr.Manager) *Service {
func NewService(swWorker *exec.Worker, stgMgr *svcmgr.Manager) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,


+ 3
- 3
agent/internal/http/service.go View File

@@ -2,15 +2,15 @@ package http

import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

type Service struct {
swWorker *exec.Worker
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
}

func NewService(swWorker *exec.Worker, stgMgr *mgr.Manager) *Service {
func NewService(swWorker *exec.Worker, stgMgr *svcmgr.Manager) *Service {
return &Service{
swWorker: swWorker,
stgMgr: stgMgr,


+ 3
- 3
agent/internal/mq/service.go View File

@@ -2,15 +2,15 @@ package mq

import (
"gitlink.org.cn/cloudream/storage/agent/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

type Service struct {
taskManager *task.Manager
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
}

func NewService(taskMgr *task.Manager, stgMgr *mgr.Manager) *Service {
func NewService(taskMgr *task.Manager, stgMgr *svcmgr.Manager) *Service {
return &Service{
taskManager: taskMgr,
stgMgr: stgMgr,


+ 3
- 3
agent/internal/task/task.go View File

@@ -6,7 +6,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

@@ -16,7 +16,7 @@ type TaskContext struct {
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
uploader *uploader.Uploader
}

@@ -35,7 +35,7 @@ type Task = task.Task[TaskContext]
// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *mgr.Manager, uploader *uploader.Uploader) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *svcmgr.Manager, uploader *uploader.Uploader) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,


+ 3
- 3
client/internal/task/task.go View File

@@ -4,14 +4,14 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/distlock" // 引入分布式锁服务
"gitlink.org.cn/cloudream/common/pkgs/task" // 引入任务处理相关的包
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" // 引入网络连接状态收集器
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务和网络连接状态收集器
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
}

// CompleteFn 类型定义了任务完成时的回调函数,用于设置任务的执行结果
@@ -31,7 +31,7 @@ type CompleteOption = task.CompleteOption

// NewManager 创建一个新的任务管理器实例,接受一个分布式锁服务和一个网络连接状态收集器作为参数
// 返回一个初始化好的任务管理器实例
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *mgr.Manager) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.Manager) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,


+ 2
- 2
client/main.go View File

@@ -19,7 +19,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

@@ -91,7 +91,7 @@ func main() {
go serveAccessStat(acStat)

// 存储管理器
stgMgr := mgr.NewManager()
stgMgr := svcmgr.NewManager()

// 任务管理器
taskMgr := task.NewManager(distlockSvc, &conCol, stgMgr)


+ 3
- 3
common/pkgs/downloader/downloader.go View File

@@ -11,7 +11,7 @@ import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

const (
@@ -41,10 +41,10 @@ type Downloader struct {
strips *StripCache
cfg Config
conn *connectivity.Collector
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
}

func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *mgr.Manager) Downloader {
func NewDownloader(cfg Config, conn *connectivity.Collector, stgMgr *svcmgr.Manager) Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}


+ 4
- 4
common/pkgs/ioswitch2/ops2/faas.go View File

@@ -6,7 +6,7 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -19,17 +19,17 @@ type InternalFaaSGalMultiply struct {
}

func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return err
}

fass, err := mgr.GetComponent[types.InternalFaaSCall](stgMgr, o.StorageID)
fass, err := svcmgr.GetComponent[types.InternalFaaSCall](stgMgr, o.StorageID)
if err != nil {
return fmt.Errorf("getting faas component: %w", err)
}

tmp, err := mgr.GetComponent[types.TempStore](stgMgr, o.StorageID)
tmp, err := svcmgr.GetComponent[types.TempStore](stgMgr, o.StorageID)
if err != nil {
return fmt.Errorf("getting temp store component: %w", err)
}


+ 3
- 3
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -11,7 +11,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -41,7 +41,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}
@@ -83,7 +83,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 2
- 2
common/pkgs/ioswitch2/ops2/shared_store.go View File

@@ -8,7 +8,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

func init() {
@@ -30,7 +30,7 @@ func (o *SharedLoad) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("load file to shared store")
defer logger.Debugf("load file to shared store finished")

stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 3
- 3
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -11,7 +11,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -41,7 +41,7 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}
@@ -83,7 +83,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

stgMgr, err := exec.GetValueByType[*mgr.Manager](ctx)
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting storage manager: %w", err)
}


+ 58
- 0
common/pkgs/storage/factory/factory.go View File

@@ -0,0 +1,58 @@
package factory

import (
"reflect"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

// 创建一个在MasterHub上长期运行的存储服务
type StorageServiceBuilder func(detail stgmod.StorageDetail) (types.StorageService, error)

// 根据存储服务信息创建一个指定类型的组件
type StorageComponentBuilder func(detail stgmod.StorageDetail, typ reflect.Type) (any, error)

type StorageBuilder struct {
CreateService StorageServiceBuilder
CreateComponent StorageComponentBuilder
}

var storageBuilders = make(map[reflect.Type]StorageBuilder)

// 注册针对指定存储服务类型的Builder
func RegisterBuilder[T cdssdk.StorageType](createSvc StorageServiceBuilder, createComp StorageComponentBuilder) {
storageBuilders[reflect2.TypeOf[T]()] = StorageBuilder{
CreateService: createSvc,
CreateComponent: createComp,
}
}

func CreateService(detail stgmod.StorageDetail) (types.StorageService, error) {
typ := reflect.TypeOf(detail.Storage.Type)
bld, ok := storageBuilders[typ]
if !ok {
return nil, types.ErrUnsupportedStorageType
}

return bld.CreateService(detail)
}

func CreateComponent[T any](detail stgmod.StorageDetail) (T, error) {
typ := reflect.TypeOf(detail.Storage.Type)
bld, ok := storageBuilders[typ]
if !ok {
var def T
return def, types.ErrUnsupportedStorageType
}

comp, err := bld.CreateComponent(detail, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
}

return comp.(T), nil
}

+ 53
- 0
common/pkgs/storage/local/local.go View File

@@ -0,0 +1,53 @@
package local

import (
"fmt"
"reflect"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func init() {
factory.RegisterBuilder[*cdssdk.LocalStorageType](createService, createComponent)
}

func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
svc := &Service{}

if detail.Storage.ShardStore != nil {
local, ok := detail.Storage.ShardStore.(*cdssdk.LocalShardStorage)
if !ok {
return nil, fmt.Errorf("invalid shard store type %T for local storage", detail.Storage.ShardStore)
}

store, err := NewShardStore(svc, *local)
if err != nil {
return nil, err
}

svc.ShardStore = store
}

if detail.Storage.SharedStore != nil {
local, ok := detail.Storage.SharedStore.(*cdssdk.LocalSharedStorage)
if !ok {
return nil, fmt.Errorf("invalid shared store type %T for local storage", detail.Storage.SharedStore)
}

store, err := NewSharedStore(svc, *local)
if err != nil {
return nil, err
}

svc.SharedStore = store
}

return svc, nil
}

func createComponent(detail stgmod.StorageDetail, typ reflect.Type) (any, error) {
return nil, types.ErrUnsupportedComponent
}

+ 58
- 0
common/pkgs/storage/local/service.go View File

@@ -0,0 +1,58 @@
package local

import (
"reflect"

"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Service struct {
Detail stgmod.StorageDetail
ShardStore *ShardStore
SharedStore *SharedStore
}

func (s *Service) Info() stgmod.StorageDetail {
return s.Detail
}

func (s *Service) GetComponent(typ reflect.Type) (any, error) {
switch typ {
case reflect2.TypeOf[types.ShardStore]():
if s.ShardStore == nil {
return nil, types.ErrComponentNotFound
}
return s.ShardStore, nil

case reflect2.TypeOf[types.SharedStore]():
if s.SharedStore == nil {
return nil, types.ErrComponentNotFound
}
return s.SharedStore, nil

default:
return nil, types.ErrComponentNotFound
}
}

func (s *Service) Start(ch *types.StorageEventChan) {
if s.ShardStore != nil {
s.ShardStore.Start(ch)
}

if s.SharedStore != nil {
s.SharedStore.Start(ch)
}
}

func (s *Service) Stop() {
if s.ShardStore != nil {
s.ShardStore.Stop()
}

if s.SharedStore != nil {
s.SharedStore.Stop()
}
}

+ 4
- 9
common/pkgs/storage/local/shard_store.go View File

@@ -25,21 +25,16 @@ const (
)

type ShardStore struct {
stg cdssdk.Storage
svc *Service
cfg cdssdk.LocalShardStorage
lock sync.Mutex
workingTempFiles map[string]bool
done chan any
}

func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
_, ok := stg.Address.(*cdssdk.LocalStorageAddress)
if !ok {
return nil, fmt.Errorf("storage address(%T) is not local", stg)
}

func NewShardStore(svc *Service, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
return &ShardStore{
stg: stg,
svc: svc,
cfg: cfg,
workingTempFiles: make(map[string]bool),
done: make(chan any, 1),
@@ -376,7 +371,7 @@ func (s *ShardStore) Stats() types.Stats {
}

func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("ShardStore", "Local").WithField("Storage", s.stg.String())
return logger.WithField("ShardStore", "Local").WithField("Storage", s.svc.Detail.Storage.String())
}

func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string {


+ 4
- 9
common/pkgs/storage/local/shared_store.go View File

@@ -17,19 +17,14 @@ import (
)

type SharedStore struct {
stg cdssdk.Storage
svc *Service
cfg cdssdk.LocalSharedStorage
// lock sync.Mutex
}

func NewSharedStore(stg cdssdk.Storage, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
_, ok := stg.Address.(*cdssdk.LocalStorageAddress)
if !ok {
return nil, fmt.Errorf("storage address(%T) is not local", stg)
}

func NewSharedStore(svc *Service, cfg cdssdk.LocalSharedStorage) (*SharedStore, error) {
return &SharedStore{
stg: stg,
svc: svc,
cfg: cfg,
}, nil
}
@@ -192,7 +187,7 @@ func (s *SharedStore) PackageGC(avaiables []stgmod.LoadedPackageID) error {
}

func (s *SharedStore) getLogger() logger.Logger {
return logger.WithField("SharedStore", "Local").WithField("Storage", s.stg.String())
return logger.WithField("SharedStore", "Local").WithField("Storage", s.svc.Detail.Storage.String())
}

type PackageWriter struct {


+ 0
- 10
common/pkgs/storage/mgr/create_components.go View File

@@ -1,10 +0,0 @@
package mgr

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createComponents(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
return nil
}

+ 0
- 27
common/pkgs/storage/mgr/create_shardstore.go View File

@@ -1,27 +0,0 @@
package mgr

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/local"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createShardStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
switch confg := detail.Storage.ShardStore.(type) {
case *cdssdk.LocalShardStorage:
store, err := local.NewShardStore(detail.Storage, *confg)
if err != nil {
return fmt.Errorf("new local shard store: %v", err)
}

store.Start(ch)
stg.Shard = store
return nil

default:
return fmt.Errorf("unsupported shard store type: %T", confg)
}
}

+ 0
- 27
common/pkgs/storage/mgr/create_sharedstore.go View File

@@ -1,27 +0,0 @@
package mgr

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/local"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createSharedStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
switch confg := detail.Storage.SharedStore.(type) {
case *cdssdk.LocalSharedStorage:
store, err := local.NewSharedStore(detail.Storage, *confg)
if err != nil {
return fmt.Errorf("new local shard store: %v", err)
}

store.Start(ch)
stg.Shared = store
return nil

default:
return fmt.Errorf("unsupported shard store type: %T", confg)
}
}

+ 0
- 172
common/pkgs/storage/mgr/mgr.go View File

@@ -1,172 +0,0 @@
package mgr

import (
"errors"
"reflect"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

var ErrStorageNotFound = errors.New("storage not found")

var ErrComponentNotFound = errors.New("component not found")

var ErrStorageExists = errors.New("storage already exists")

type storage struct {
Shard types.ShardStore
Shared types.SharedStore
Temp types.TempStore
Components []types.StorageComponent
}

type Manager struct {
storages map[cdssdk.StorageID]*storage
lock sync.Mutex
eventChan *types.StorageEventChan
}

func NewManager() *Manager {
return &Manager{
storages: make(map[cdssdk.StorageID]*storage),
eventChan: async.NewUnboundChannel[types.StorageEvent](),
}
}

func (m *Manager) InitStorage(detail stgmod.StorageDetail) error {
m.lock.Lock()
defer m.lock.Unlock()

if _, ok := m.storages[detail.Storage.StorageID]; ok {
return ErrStorageExists
}

stg := &storage{}

if detail.Storage.ShardStore != nil {
err := createShardStore(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}
}

if detail.Storage.SharedStore != nil {
err := createSharedStore(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}
}

// 创建其他组件
err := createComponents(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}

m.storages[detail.Storage.StorageID] = stg
return nil
}

func stopStorage(stg *storage) {
if stg.Shard != nil {
stg.Shard.Stop()
}

if stg.Shared != nil {
stg.Shared.Stop()
}

for _, c := range stg.Components {
c.Stop()
}
}

// 查找指定Storage的ShardStore组件
func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

if stg.Shard == nil {
return nil, ErrComponentNotFound
}

return stg.Shard, nil
}

// 查找指定Storage的SharedStore组件
func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

if stg.Shared == nil {
return nil, ErrComponentNotFound
}

return stg.Shared, nil
}

func (m *Manager) GetTempStore(stgID cdssdk.StorageID) (types.TempStore, error) {
return nil, nil
}

// 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件
func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (types.StorageComponent, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

switch typ {
case reflect2.TypeOf[types.ShardStore]():
if stg.Shard == nil {
return nil, ErrComponentNotFound
}

return stg.Shard, nil
case reflect2.TypeOf[types.SharedStore]():
if stg.Shared == nil {
return nil, ErrComponentNotFound
}

return stg.Shared, nil
default:
for _, c := range stg.Components {
if reflect.TypeOf(c) == typ {
return c, nil
}
}

return nil, ErrComponentNotFound
}
}

func GetComponent[T types.StorageComponent](mgr *Manager, stgID cdssdk.StorageID) (T, error) {
ret, err := mgr.GetComponent(stgID, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
}

return ret.(T), nil
}

+ 85
- 0
common/pkgs/storage/svcmgr/mgr.go View File

@@ -0,0 +1,85 @@
package svcmgr

import (
"reflect"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type storage struct {
Service types.StorageService
}

type Manager struct {
storages map[cdssdk.StorageID]*storage
lock sync.Mutex
eventChan *types.StorageEventChan
}

func NewManager() *Manager {
return &Manager{
storages: make(map[cdssdk.StorageID]*storage),
eventChan: async.NewUnboundChannel[types.StorageEvent](),
}
}

func (m *Manager) CreateService(detail stgmod.StorageDetail) error {
m.lock.Lock()
defer m.lock.Unlock()

if _, ok := m.storages[detail.Storage.StorageID]; ok {
return types.ErrStorageExists
}

stg := &storage{}

svc, err := factory.CreateService(detail)
if err != nil {
return err
}

stg.Service = svc
m.storages[detail.Storage.StorageID] = stg

svc.Start(m.eventChan)
return nil
}

// 查找指定Storage的ShardStore组件
func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
return GetComponent[types.ShardStore](m, stgID)
}

// 查找指定Storage的SharedStore组件
func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) {
return GetComponent[types.SharedStore](m, stgID)
}

// 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件
func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (any, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, types.ErrStorageNotFound
}

return stg.Service.GetComponent(typ)
}

func GetComponent[T any](mgr *Manager, stgID cdssdk.StorageID) (T, error) {
ret, err := mgr.GetComponent(stgID, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
}

return ret.(T), nil
}

+ 0
- 7
common/pkgs/storage/temp/temp_store.go View File

@@ -1,7 +0,0 @@
package tempstore

type TempStore interface {
CreateTemp()
Commited(objectName string)
Drop(objectName string)
}

+ 0
- 1
common/pkgs/storage/types/bypass.go View File

@@ -3,6 +3,5 @@ package types
import "io"

type BypassWriter interface {
StorageComponent
Write(stream io.Reader) (string, error)
}

+ 0
- 1
common/pkgs/storage/types/faas.go View File

@@ -3,6 +3,5 @@ package types
import "context"

type InternalFaaSCall interface {
StorageComponent
GalMultiply(ctx context.Context, coef [][]byte, inputs []string, outputs []string, chunkSize int) error
}

+ 2
- 1
common/pkgs/storage/types/shard_store.go View File

@@ -23,7 +23,8 @@ type StoreEvent interface {
}

type ShardStore interface {
StorageComponent
Start(ch *StorageEventChan)
Stop()
// 写入一个新文件,写入后获得FileHash
Create(stream io.Reader) (FileInfo, error)
// 使用F函数创建Option对象


+ 2
- 1
common/pkgs/storage/types/shared_store.go View File

@@ -8,7 +8,8 @@ import (
)

type SharedStore interface {
StorageComponent
Start(ch *StorageEventChan)
Stop()
// 写入一个文件到Package的调度目录下,返回值为文件路径:userID/pkgID/path
WritePackageObject(userID cdssdk.UserID, pkgID cdssdk.PackageID, path string, stream io.Reader) (string, error)
// 获取所有已加载的Package信息


+ 0
- 1
common/pkgs/storage/types/temp_store.go View File

@@ -1,7 +1,6 @@
package types

type TempStore interface {
StorageComponent
// 生成并注册一个临时文件名。在名字有效期间此临时文件不会被清理
CreateTemp() string
// 指示一个临时文件已经被移动作它用,不需要再关注它了(也不需要删除这个文件)。


+ 21
- 2
common/pkgs/storage/types/types.go View File

@@ -1,12 +1,31 @@
package types

import "gitlink.org.cn/cloudream/common/pkgs/async"
import (
"errors"
"reflect"

"gitlink.org.cn/cloudream/common/pkgs/async"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

var ErrStorageNotFound = errors.New("storage not found")

var ErrComponentNotFound = errors.New("component not found")

var ErrStorageExists = errors.New("storage already exists")

var ErrUnsupportedStorageType = errors.New("unsupported storage type")

var ErrUnsupportedComponent = errors.New("unsupported component type")

type StorageEvent interface{}

type StorageEventChan = async.UnboundChannel[StorageEvent]

type StorageComponent interface {
// 代表一个长期运行在MasterHub上的存储服务
type StorageService interface {
Info() stgmod.StorageDetail
GetComponent(typ reflect.Type) (any, error)
Start(ch *StorageEventChan)
Stop()
}

+ 3
- 3
common/pkgs/uploader/uploader.go View File

@@ -15,16 +15,16 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

type Uploader struct {
distlock *distlock.Service
connectivity *connectivity.Collector
stgMgr *mgr.Manager
stgMgr *svcmgr.Manager
}

func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *mgr.Manager) *Uploader {
func NewUploader(distlock *distlock.Service, connectivity *connectivity.Collector, stgMgr *svcmgr.Manager) *Uploader {
return &Uploader{
distlock: distlock,
connectivity: connectivity,


+ 3
- 3
scanner/internal/event/event.go View File

@@ -9,13 +9,13 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/typedispatcher"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
)

type ExecuteArgs struct {
DB *db2.DB
DistLock *distlock.Service
StgMgr *mgr.Manager
StgMgr *svcmgr.Manager
}

type Executor = event.Executor[ExecuteArgs]
@@ -26,7 +26,7 @@ type Event = event.Event[ExecuteArgs]

type ExecuteOption = event.ExecuteOption

func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *mgr.Manager) Executor {
func NewExecutor(db *db2.DB, distLock *distlock.Service, stgMgr *svcmgr.Manager) Executor {
return event.NewExecutor(ExecuteArgs{
DB: db,
DistLock: distLock,


+ 2
- 2
scanner/main.go View File

@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/scanner/internal/config"
"gitlink.org.cn/cloudream/storage/scanner/internal/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/mq"
@@ -48,7 +48,7 @@ func main() {
go serveDistLock(distlockSvc)

// 启动存储服务管理器
stgMgr := mgr.NewManager()
stgMgr := svcmgr.NewManager()

// 启动事件执行器
eventExecutor := event.NewExecutor(db, distlockSvc, stgMgr)


Loading…
Cancel
Save