Browse Source

调整storage的类型定义

gitlink
Sydonian 1 year ago
parent
commit
2071be8334
11 changed files with 42 additions and 223 deletions
  1. +0
    -113
      common/pkgs/storage/agtpool/pool.go
  2. +2
    -2
      common/pkgs/storage/factory/factory.go
  3. +3
    -3
      common/pkgs/storage/factory/reg/reg.go
  4. +4
    -4
      common/pkgs/storage/types/bypass.go
  5. +13
    -27
      common/pkgs/storage/types/empty_builder.go
  6. +0
    -3
      common/pkgs/storage/types/public_store.go
  7. +4
    -4
      common/pkgs/storage/types/s2s.go
  8. +6
    -10
      common/pkgs/storage/types/shard_store.go
  9. +0
    -10
      common/pkgs/storage/types/temp_store.go
  10. +7
    -36
      common/pkgs/storage/types/types.go
  11. +3
    -11
      common/pkgs/storage/utils/utils.go

+ 0
- 113
common/pkgs/storage/agtpool/pool.go View File

@@ -1,113 +0,0 @@
package agtpool

import (
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types"
)

type storage struct {
Agent types.StorageAgent
}

type AgentPool struct {
storages map[cdssdk.StorageID]*storage
lock sync.Mutex
eventChan *types.StorageEventChan
}

func NewPool() *AgentPool {
return &AgentPool{
storages: make(map[cdssdk.StorageID]*storage),
eventChan: async.NewUnboundChannel[types.StorageEvent](),
}
}

func (m *AgentPool) SetupAgent(detail stgmod.StorageDetail) error {
m.lock.Lock()
defer m.lock.Unlock()

if _, ok := m.storages[detail.Storage.StorageID]; ok {
return types.ErrStorageExists
}

stg := &storage{}

bld := factory.GetBuilder(detail)
svc, err := bld.CreateAgent()
if err != nil {
return err
}

stg.Agent = svc
m.storages[detail.Storage.StorageID] = stg

svc.Start(m.eventChan)
return nil
}

func (m *AgentPool) GetInfo(stgID cdssdk.StorageID) (stgmod.StorageDetail, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return stgmod.StorageDetail{}, types.ErrStorageNotFound
}

return stg.Agent.Info(), nil
}

func (m *AgentPool) GetAgent(stgID cdssdk.StorageID) (types.StorageAgent, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, types.ErrStorageNotFound
}

return stg.Agent, nil
}

func (m *AgentPool) GetAllAgents() []types.StorageAgent {
m.lock.Lock()
defer m.lock.Unlock()

agents := make([]types.StorageAgent, 0, len(m.storages))
for _, stg := range m.storages {
agents = append(agents, stg.Agent)
}

return agents
}

// 查找指定Storage的ShardStore组件
func (m *AgentPool) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, types.ErrStorageNotFound
}

return stg.Agent.GetShardStore()
}

// 查找指定Storage的PublicStore组件
func (m *AgentPool) GetPublicStore(stgID cdssdk.StorageID) (types.PublicStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, types.ErrStorageNotFound
}

return stg.Agent.GetPublicStore()
}

+ 2
- 2
common/pkgs/storage/factory/factory.go View File

@@ -3,7 +3,7 @@ package factory
import (
"reflect"

stgmod "gitlink.org.cn/cloudream/storage2/common/models"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
_ "gitlink.org.cn/cloudream/storage2/common/pkgs/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types"
@@ -11,7 +11,7 @@ import (

// 此函数永远不会返回nil。如果找不到对应的Builder,则会返回EmptyBuilder,
// 此Builder的所有函数都会返回否定值或者封装后的ErrUnsupported错误(需要使用errors.Is检查)
func GetBuilder(detail stgmod.StorageDetail) types.StorageBuilder {
func GetBuilder(detail *clitypes.UserSpaceDetail) types.StorageBuilder {
typ := reflect.TypeOf(detail.Storage.Type)

ctor, ok := reg.StorageBuilders[typ]


+ 3
- 3
common/pkgs/storage/factory/reg/reg.go View File

@@ -5,11 +5,11 @@ import (

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types"
)

type BuilderCtor func(detail stgmod.StorageDetail) types.StorageBuilder
type BuilderCtor func(detail *clitypes.UserSpaceDetail) types.StorageBuilder

var StorageBuilders = make(map[reflect.Type]BuilderCtor)

@@ -21,7 +21,7 @@ func RegisterBuilder[T cdssdk.StorageType](ctor BuilderCtor) {
// 注:此函数只给storage包内部使用,外部包请使用外层的factory.GetBuilder
// 此函数永远不会返回nil。如果找不到对应的Builder,则会返回EmptyBuilder,
// 此Builder的所有函数都会返回否定值或者封装后的ErrUnsupported错误(需要使用errors.Is检查)
func GetBuilderInternal(detail stgmod.StorageDetail) types.StorageBuilder {
func GetBuilderInternal(detail *clitypes.UserSpaceDetail) types.StorageBuilder {
typ := reflect.TypeOf(detail.Storage.Type)

ctor, ok := StorageBuilders[typ]


+ 4
- 4
common/pkgs/storage/types/bypass.go View File

@@ -1,13 +1,13 @@
package types

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

// 通过旁路上传后的文件的信息
type BypassUploadedFile struct {
Path string
Hash cdssdk.FileHash
Hash clitypes.FileHash
Size int64
}

@@ -26,13 +26,13 @@ type BypassFilePath struct {
// 不通过ShardStore读取文件,但需要它返回文件的路径。
// 仅用于分片存储。
type BypassRead interface {
BypassRead(fileHash cdssdk.FileHash) (BypassFilePath, error)
BypassRead(fileHash clitypes.FileHash) (BypassFilePath, error)
}

// 能通过一个Http请求直接访问文件
// 仅用于分片存储。
type HTTPBypassRead interface {
HTTPBypassRead(fileHash cdssdk.FileHash) (HTTPRequest, error)
HTTPBypassRead(fileHash clitypes.FileHash) (HTTPRequest, error)
}

type HTTPRequest struct {


+ 13
- 27
common/pkgs/storage/types/empty_builder.go View File

@@ -3,27 +3,24 @@ package types
import (
"fmt"

stgmod "gitlink.org.cn/cloudream/storage2/common/models"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

type EmptyBuilder struct {
Detail stgmod.StorageDetail
Detail clitypes.UserSpaceDetail
}

// 创建一个在MasterHub上长期运行的存储服务
func (b *EmptyBuilder) CreateAgent() (StorageAgent, error) {
return nil, fmt.Errorf("create agent for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
func (b *EmptyBuilder) FeatureDesc() FeatureDesc {
return &EmptyFeatureDesc{}
}

func (b *EmptyBuilder) ShardStoreDesc() ShardStoreDesc {
return &EmptyShardStoreDesc{}
func (b *EmptyBuilder) CreateShardStore() (ShardStore, error) {
return nil, fmt.Errorf("create shard store for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}

func (b *EmptyBuilder) PublicStoreDesc() PublicStoreDesc {
return &EmptyPublicStoreDesc{}
func (b *EmptyBuilder) CreatePublicStore() (PublicStore, error) {
return nil, fmt.Errorf("create public store for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}

// 创建一个分片上传组件
func (b *EmptyBuilder) CreateMultiparter() (Multiparter, error) {
return nil, fmt.Errorf("create multipart initiator for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}
@@ -36,32 +33,21 @@ func (b *EmptyBuilder) CreateECMultiplier() (ECMultiplier, error) {
return nil, fmt.Errorf("create ec multiplier for %T: %w", b.Detail.Storage.Type, ErrUnsupported)
}

type EmptyShardStoreDesc struct {
type EmptyFeatureDesc struct {
}

func (d *EmptyShardStoreDesc) Enabled() bool {
func (d *EmptyFeatureDesc) Enabled() bool {
return false
}

func (d *EmptyShardStoreDesc) HasBypassWrite() bool {
func (d *EmptyFeatureDesc) HasBypassWrite() bool {
return false
}

func (d *EmptyShardStoreDesc) HasBypassRead() bool {
return false
}

func (d *EmptyShardStoreDesc) HasBypassHTTPRead() bool {
return false
}

type EmptyPublicStoreDesc struct {
}

func (d *EmptyPublicStoreDesc) Enabled() bool {
func (d *EmptyFeatureDesc) HasBypassRead() bool {
return false
}

func (d *EmptyPublicStoreDesc) HasBypassWrite() bool {
func (d *EmptyFeatureDesc) HasBypassHTTPRead() bool {
return false
}

+ 0
- 3
common/pkgs/storage/types/public_store.go View File

@@ -5,9 +5,6 @@ import (
)

type PublicStore interface {
Start(ch *StorageEventChan)
Stop()

Write(objectPath string, stream io.Reader) error
Read(objectPath string) (io.ReadCloser, error)
// 返回指定路径下的所有文件


+ 4
- 4
common/pkgs/storage/types/s2s.go View File

@@ -3,14 +3,14 @@ package types
import (
"context"

stgmod "gitlink.org.cn/cloudream/storage2/common/models"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

type S2STransfer interface {
// 判断是否能从指定的源存储中直传到当前存储的目的路径
CanTransfer(src stgmod.StorageDetail) bool
// 判断是否能从指定的源存储中直传到当前存储的目的路径。仅在生成计划时使用
CanTransfer(src clitypes.UserSpaceDetail) bool
// 执行数据直传。返回传输后的文件路径
Transfer(ctx context.Context, src stgmod.StorageDetail, srcPath string) (string, error)
Transfer(ctx context.Context, src clitypes.UserSpaceDetail, srcPath string) (string, error)
// 完成传输
Complete()
// 取消传输。如果已经调用了Complete,则这个方法应该无效果


+ 6
- 10
common/pkgs/storage/types/shard_store.go View File

@@ -4,7 +4,7 @@ import (
"fmt"
"io"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

type Status interface {
@@ -30,22 +30,18 @@ type ShardStore interface {
// 使用F函数创建Option对象
Open(opt OpenOption) (io.ReadCloser, error)
// 获得指定文件信息
Info(fileHash cdssdk.FileHash) (FileInfo, error)
Info(fileHash clitypes.FileHash) (FileInfo, error)
// 获取所有文件信息,尽量保证操作是原子的
ListAll() ([]FileInfo, error)
// 垃圾清理。只保留availables中的文件,删除其他文件
GC(avaiables []cdssdk.FileHash) error
GC(avaiables []clitypes.FileHash) error
// 获得存储系统信息
Stats() Stats
}

type Config interface {
Build() (ShardStore, error)
}

type FileInfo struct {
// 文件的SHA256哈希值,全大写的16进制字符串格式
Hash cdssdk.FileHash
Hash clitypes.FileHash
Size int64
// 文件描述信息,比如文件名,用于调试
Description string
@@ -65,12 +61,12 @@ type Stats struct {
}

type OpenOption struct {
FileHash cdssdk.FileHash
FileHash clitypes.FileHash
Offset int64
Length int64
}

func NewOpen(fileHash cdssdk.FileHash) OpenOption {
func NewOpen(fileHash clitypes.FileHash) OpenOption {
return OpenOption{
FileHash: fileHash,
Offset: 0,


+ 0
- 10
common/pkgs/storage/types/temp_store.go View File

@@ -1,10 +0,0 @@
package types

type TempStore interface {
// 生成并注册一个临时文件路径,在Commited或Drop之前,此文件不会被清理。
CreateTemp() string
// 指示一个临时文件已经被移动作它用,不需要再关注它了(也不需要删除这个文件)。
Commited(filePath string)
// 临时文件被放弃,可以删除这个文件了。如果提前调用了Commited,则此函数应该什么也不做。
Drop(filePath string)
}

+ 7
- 36
common/pkgs/storage/types/types.go View File

@@ -4,7 +4,6 @@ import (
"errors"

"gitlink.org.cn/cloudream/common/pkgs/async"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
)

var ErrStorageNotFound = errors.New("storage not found")
@@ -18,34 +17,13 @@ type StorageEvent interface{}

type StorageEventChan = async.UnboundChannel[StorageEvent]

// 在MasterHub上运行,代理一个存储服务。
//
// 存放Storage的运行时数据。如果一个组件需要与Agent交互(比如实际是ShardStore功能的一部分),或者是需要长期运行,
// 那么就将该组件的Get函数放到StorageAgent接口中。可以同时在StorageBuilder中同时提供HasXXX函数,
// 用于判断该Storage是否支持某个功能,用于生成ioswitch计划时判断是否能利用此功能。
type StorageAgent interface {
Start(ch *StorageEventChan)
Stop()

Info() stgmod.StorageDetail
// 获取分片存储服务
GetShardStore() (ShardStore, error)
// 获取共享存储服务
GetPublicStore() (PublicStore, error)
}

// 创建存储服务的指定组件。
//
// 如果指定组件比较独立,不需要依赖运行时数据,或者不需要与Agent交互,那么就可以将Create函数放到这个接口中。
// 增加Has函数用于判断该Storage是否有某个组件。
// 如果Create函数仅仅只是创建一个结构体,没有其他副作用,那么也可以用Create函数来判断是否支持某个功能。
type StorageBuilder interface {
// 创建一个在MasterHub上长期运行的存储服务
CreateAgent() (StorageAgent, error)
// 是否支持分片存储服务
ShardStoreDesc() ShardStoreDesc
// 是否支持共享存储服务
PublicStoreDesc() PublicStoreDesc
// 关于此存储系统特性功能的描述
FeatureDesc() FeatureDesc
// 创建一个分片存储组件
CreateShardStore() (ShardStore, error)
// 创建一个公共存储组件
CreatePublicStore() (PublicStore, error)
// 创建一个分片上传组件
CreateMultiparter() (Multiparter, error)
// 创建一个存储服务直传组件
@@ -53,7 +31,7 @@ type StorageBuilder interface {
CreateECMultiplier() (ECMultiplier, error)
}

type ShardStoreDesc interface {
type FeatureDesc interface {
// 是否已启动
Enabled() bool
// 是否能旁路上传
@@ -63,10 +41,3 @@ type ShardStoreDesc interface {
// 是否能通过HTTP读取
HasBypassHTTPRead() bool
}

type PublicStoreDesc interface {
// 是否已启动
Enabled() bool
// 是否能旁路上传
HasBypassWrite() bool
}

+ 3
- 11
common/pkgs/storage/utils/utils.go View File

@@ -1,19 +1,11 @@
package utils

import (
"fmt"
"path/filepath"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

func MakeLoadedPackagePath(userID cdssdk.UserID, packageID cdssdk.PackageID) string {
return filepath.Join(fmt.Sprintf("%v", userID), fmt.Sprintf("%v", packageID))
}

func FindFeature[T cdssdk.StorageFeature](detail stgmod.StorageDetail) T {
for _, f := range detail.Storage.Features {
func FindFeature[T cortypes.StorageFeature](detail cortypes.Storage) T {
for _, f := range detail.Features {
f2, ok := f.(T)
if ok {
return f2


Loading…
Cancel
Save