Browse Source

解决分片上传的问题

gitlink
Sydonian 1 year ago
parent
commit
449c999c2a
20 changed files with 418 additions and 119 deletions
  1. +1
    -1
      agent/internal/cmd/serve.go
  2. +55
    -4
      client/internal/cmdline/test.go
  3. +14
    -4
      common/pkgs/ioswitch2/ops2/bypass.go
  4. +1
    -1
      common/pkgs/ioswitch2/ops2/clone.go
  5. +2
    -2
      common/pkgs/ioswitch2/ops2/driver.go
  6. +17
    -36
      common/pkgs/ioswitch2/ops2/multipart.go
  7. +1
    -1
      common/pkgs/ioswitch2/ops2/ops.go
  8. +2
    -2
      common/pkgs/ioswitch2/ops2/shard_store.go
  9. +2
    -2
      common/pkgs/ioswitch2/ops2/shared_store.go
  10. +48
    -10
      common/pkgs/ioswitch2/parser/parser.go
  11. +16
    -27
      common/pkgs/storage/factory/factory.go
  12. +19
    -0
      common/pkgs/storage/factory/reg/reg.go
  13. +31
    -3
      common/pkgs/storage/local/local.go
  14. +136
    -0
      common/pkgs/storage/local/multipart_upload.go
  15. +51
    -8
      common/pkgs/storage/local/shard_store.go
  16. +1
    -0
      common/pkgs/storage/obs/obs.go
  17. +1
    -0
      common/pkgs/storage/s3/shard_store.go
  18. +1
    -4
      common/pkgs/storage/types/bypass.go
  19. +8
    -10
      common/pkgs/storage/types/s3_client.go
  20. +11
    -4
      common/pkgs/storage/types/types.go

+ 1
- 1
agent/internal/cmd/serve.go View File

@@ -57,7 +57,7 @@ func serve(configPath string) {
for _, stg := range hubCfg.Storages {
err := stgMgr.CreateService(stg)
if err != nil {
fmt.Printf("init storage %v: %v", stg, err)
fmt.Printf("init storage %v: %v", stg.Storage.String(), err)
os.Exit(1)
}
}


+ 55
- 4
client/internal/cmdline/test.go View File

@@ -63,8 +63,8 @@ func init() {
},
})
rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
Use: "test1",
Short: "test1",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
@@ -133,8 +133,8 @@ func init() {
})

rootCmd.AddCommand(&cobra.Command{
Use: "test3",
Short: "test3",
Use: "test4",
Short: "test4",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
@@ -181,4 +181,55 @@ func init() {
fut.Wait(context.TODO())
},
})

rootCmd.AddCommand(&cobra.Command{
Use: "test",
Short: "test",
Run: func(cmd *cobra.Command, args []string) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
panic(err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

stgs, err := coorCli.GetStorageDetails(coormq.ReqGetStorageDetails([]cdssdk.StorageID{1, 2}))
if err != nil {
panic(err)
}

ft := ioswitch2.NewFromTo()
ft.SegmentParam = cdssdk.NewSegmentRedundancy(1293, 3)
ft.ECParam = &cdssdk.DefaultECRedundancy
ft.AddFrom(ioswitch2.NewFromShardstore("22CC59CE3297F78F2D20DC1E33181B77F21E6782097C94E1664F99F129834069", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(0)))
ft.AddFrom(ioswitch2.NewFromShardstore("5EAC20EB3EBC7B5FA176C5BD1C01041FB2A6D14C35D6A232CA83D7F1E4B01ADE", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(1)))
ft.AddFrom(ioswitch2.NewFromShardstore("A9BC1802F37100C80C72A1D6E8F53C0E0B73F85F99153D8C78FB01CEC9D8D903", *stgs.Storages[1].MasterHub, stgs.Storages[1].Storage, ioswitch2.SegmentStream(2)))
ft.AddTo(ioswitch2.NewToShardStoreWithRange(*stgs.Storages[0].MasterHub, *stgs.Storages[0], ioswitch2.RawStream(), "raw", exec.NewRange(10, 645)))

plans := exec.NewPlanBuilder()
err = parser.Parse(ft, plans)
if err != nil {
panic(err)
}

fmt.Printf("plans: %v\n", plans)

exec := plans.Execute(exec.NewExecContext())

fut := future.NewSetVoid()
go func() {
mp, err := exec.Wait(context.Background())
if err != nil {
panic(err)
}

for k, v := range mp {
fmt.Printf("%s: %v\n", k, v)
}

fut.SetVoid()
}()

fut.Wait(context.TODO())
},
})
}

+ 14
- 4
common/pkgs/ioswitch2/ops2/bypass.go View File

@@ -13,6 +13,7 @@ import (
func init() {
exec.UseOp[*BypassToShardStore]()
exec.UseVarValue[*BypassFileInfoValue]()
exec.UseVarValue[*BypassHandleResultValue]()
}

type BypassFileInfoValue struct {
@@ -39,6 +40,7 @@ type BypassToShardStore struct {
StorageID cdssdk.StorageID
BypassFileInfo exec.VarID
BypassCallback exec.VarID
FileHash exec.VarID
}

func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
@@ -68,6 +70,7 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er
}

e.PutVar(o.BypassCallback, &BypassHandleResultValue{Commited: true})
e.PutVar(o.FileHash, &FileHashValue{Hash: fileInfo.FileHash})
return nil
}

@@ -77,17 +80,19 @@ func (o *BypassToShardStore) String() string {

type BypassToShardStoreNode struct {
dag.NodeBase
StorageID cdssdk.StorageID
StorageID cdssdk.StorageID
FileHashStoreKey string
}

func (b *GraphNodeBuilder) NewBypassToShardStore(storageID cdssdk.StorageID) *BypassToShardStoreNode {
func (b *GraphNodeBuilder) NewBypassToShardStore(storageID cdssdk.StorageID, fileHashStoreKey string) *BypassToShardStoreNode {
node := &BypassToShardStoreNode{
StorageID: storageID,
StorageID: storageID,
FileHashStoreKey: fileHashStoreKey,
}
b.AddNode(node)

node.InputValues().Init(1)
node.OutputValues().Init(node, 1)
node.OutputValues().Init(node, 2)
return node
}

@@ -102,10 +107,15 @@ func (n *BypassToShardStoreNode) BypassCallbackVar() *dag.ValueVar {
return n.OutputValues().Get(0)
}

func (n *BypassToShardStoreNode) FileHashVar() *dag.ValueVar {
return n.OutputValues().Get(1)
}

func (t *BypassToShardStoreNode) GenerateOp() (exec.Op, error) {
return &BypassToShardStore{
StorageID: t.StorageID,
BypassFileInfo: t.BypassFileInfoSlot().Var().VarID,
BypassCallback: t.BypassCallbackVar().VarID,
FileHash: t.FileHashVar().VarID,
}, nil
}

+ 1
- 1
common/pkgs/ioswitch2/ops2/clone.go View File

@@ -70,7 +70,7 @@ func (o *CloneVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
}

func (o *CloneVar) String() string {
return fmt.Sprintf("CloneStream %v -> (%v)", o.Raw, utils.FormatVarIDs(o.Cloneds))
return fmt.Sprintf("CloneVar %v -> (%v)", o.Raw, utils.FormatVarIDs(o.Cloneds))
}

type CloneStreamType struct {


+ 2
- 2
common/pkgs/ioswitch2/ops2/driver.go View File

@@ -67,8 +67,8 @@ func (t *ToDriverNode) SetInput(v *dag.StreamVar) {
v.To(t, 0)
}

func (t *ToDriverNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
func (t *ToDriverNode) Input() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: t,
Index: 0,
}


+ 17
- 36
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -7,10 +7,8 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/svcmgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

@@ -22,7 +20,6 @@ func init() {
}

type MultipartUploadArgsValue struct {
Key string
InitState types.MultipartInitState
}

@@ -43,7 +40,7 @@ func (v *UploadedPartInfoValue) Clone() exec.VarValue {
}

type MultipartInitiator struct {
StorageID cdssdk.StorageID
Storage stgmod.StorageDetail
UploadArgs exec.VarID
UploadedParts []exec.VarID
BypassFileOutput exec.VarID // 分片上传之后的临时文件的路径
@@ -51,31 +48,19 @@ type MultipartInitiator struct {
}

func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgMgr, err := exec.GetValueByType[*svcmgr.Manager](ctx)
if err != nil {
return err
}

tempStore, err := svcmgr.GetComponent[types.TempStore](stgMgr, o.StorageID)
if err != nil {
return err
}
objName := tempStore.CreateTemp()
defer tempStore.Drop(objName)

initiator, err := svcmgr.GetComponent[types.MultipartInitiator](stgMgr, o.StorageID)
initiator, err := factory.CreateComponent[types.MultipartInitiator](o.Storage)
if err != nil {
return err
}
defer initiator.Abort()

// 启动一个新的上传任务
initState, err := initiator.Initiate(ctx.Context, objName)
initState, err := initiator.Initiate(ctx.Context)
if err != nil {
return err
}
// 分发上传参数
e.PutVar(o.UploadArgs, &MultipartUploadArgsValue{
Key: objName,
InitState: initState,
})

@@ -90,18 +75,15 @@ func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) er
partInfos[i] = v.UploadedPartInfo
}

// 完成分片上传
compInfo, err := initiator.Complete(ctx.Context, partInfos)
// 合并分片
fileInfo, err := initiator.JoinParts(ctx.Context, partInfos)
if err != nil {
return fmt.Errorf("completing multipart upload: %v", err)
}

// 告知后续Op临时文件的路径
e.PutVar(o.BypassFileOutput, &BypassFileInfoValue{
BypassFileInfo: types.BypassFileInfo{
TempFilePath: objName,
FileHash: compInfo.FileHash,
},
BypassFileInfo: fileInfo,
})

// 等待后续Op处理临时文件
@@ -111,14 +93,14 @@ func (o *MultipartInitiator) Execute(ctx *exec.ExecContext, e *exec.Executor) er
}

if cb.Commited {
tempStore.Commited(objName)
initiator.Complete()
}

return nil
}

func (o *MultipartInitiator) String() string {
return "MultipartInitiator"
return fmt.Sprintf("MultipartInitiator Args: %v, Parts: %v, BypassFileOutput: %v, BypassCallback: %v", o.UploadArgs, o.UploadedParts, o.BypassFileOutput, o.BypassCallback)
}

type MultipartUpload struct {
@@ -132,7 +114,7 @@ type MultipartUpload struct {

func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
uploadArgs, err := exec.BindVar[*MultipartUploadArgsValue](e, ctx.Context, o.UploadArgs)
if err == nil {
if err != nil {
return err
}

@@ -148,12 +130,11 @@ func (o *MultipartUpload) Execute(ctx *exec.ExecContext, e *exec.Executor) error
}

startTime := time.Now()
uploadedInfo, err := uploader.UploadPart(ctx.Context, uploadArgs.InitState, uploadArgs.Key, o.PartSize, o.PartNumber, partStr.Stream)
log.Debugf("upload finished in %v", time.Since(startTime))

uploadedInfo, err := uploader.UploadPart(ctx.Context, uploadArgs.InitState, o.PartSize, o.PartNumber, partStr.Stream)
if err != nil {
return err
}
log.Debugf("upload finished in %v", time.Since(startTime))

e.PutVar(o.UploadResult, &UploadedPartInfoValue{
uploadedInfo,
@@ -168,12 +149,12 @@ func (o *MultipartUpload) String() string {

type MultipartInitiatorNode struct {
dag.NodeBase
StorageID cdssdk.StorageID `json:"storageID"`
Storage stgmod.StorageDetail `json:"storageID"`
}

func (b *GraphNodeBuilder) NewMultipartInitiator(storageID cdssdk.StorageID) *MultipartInitiatorNode {
func (b *GraphNodeBuilder) NewMultipartInitiator(storage stgmod.StorageDetail) *MultipartInitiatorNode {
node := &MultipartInitiatorNode{
StorageID: storageID,
Storage: storage,
}
b.AddNode(node)

@@ -200,13 +181,13 @@ func (n *MultipartInitiatorNode) BypassCallbackSlot() dag.ValueInputSlot {
func (n *MultipartInitiatorNode) AppendPartInfoSlot() dag.ValueInputSlot {
return dag.ValueInputSlot{
Node: n,
Index: n.InputStreams().EnlargeOne(),
Index: n.InputValues().EnlargeOne(),
}
}

func (n *MultipartInitiatorNode) GenerateOp() (exec.Op, error) {
return &MultipartInitiator{
StorageID: n.StorageID,
Storage: n.Storage,
UploadArgs: n.UploadArgsVar().VarID,
UploadedParts: n.InputValues().GetVarIDsStart(1),
BypassFileOutput: n.BypassFileInfoVar().VarID,


+ 1
- 1
common/pkgs/ioswitch2/ops2/ops.go View File

@@ -23,7 +23,7 @@ type FromNode interface {
type ToNode interface {
dag.Node
GetTo() ioswitch2.To
Input() dag.StreamOutputSlot
Input() dag.StreamInputSlot
SetInput(input *dag.StreamVar)
}



+ 2
- 2
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -185,8 +185,8 @@ func (t *ShardWriteNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}

func (t *ShardWriteNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
func (t *ShardWriteNode) Input() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: t,
Index: 0,
}


+ 2
- 2
common/pkgs/ioswitch2/ops2/shared_store.go View File

@@ -95,8 +95,8 @@ func (t *SharedLoadNode) SetInput(input *dag.StreamVar) {
input.To(t, 0)
}

func (t *SharedLoadNode) Input() dag.StreamOutputSlot {
return dag.StreamOutputSlot{
func (t *SharedLoadNode) Input() dag.StreamInputSlot {
return dag.StreamInputSlot{
Node: t,
Index: 0,
}


+ 48
- 10
common/pkgs/ioswitch2/parser/parser.go View File

@@ -107,7 +107,7 @@ func Parse(ft ioswitch2.FromTo, blder *exec.PlanBuilder) error {
removeUnusedFromNode(&ctx)
useMultipartUploadToShardStore(&ctx)
dropUnused(&ctx)
storeIPFSWriteResult(&ctx)
storeShardWriteResult(&ctx)
generateRange(&ctx)
generateClone(&ctx)

@@ -514,8 +514,15 @@ func fixSegmentSplit(ctx *ParseContext) error {
startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(ctx.StreamRange.Offset, strEnd)

// 关闭超出范围的分段
for i := endSeg; i < len(node.Segments); i++ {
node.OutputStreams().Get(i).ClearAllDst()
}
node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)

for i := 0; i < startSeg; i++ {
node.OutputStreams().Get(i).ClearAllDst()
}
node.OutputStreams().Slots.RemoveRange(0, startSeg)
node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)

@@ -548,9 +555,16 @@ func fixSegmentJoin(ctx *ParseContext) error {
startSeg, endSeg := ctx.Ft.SegmentParam.CalcSegmentRange(start, end)

// 关闭超出范围的分段
node.OutputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
for i := endSeg; i < len(node.Segments); i++ {
node.InputStreams().Get(i).NotTo(node)
}
node.InputStreams().Slots.RemoveRange(endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
node.Segments = lo2.RemoveRange(node.Segments, endSeg, ctx.Ft.SegmentParam.SegmentCount()-endSeg)
node.OutputStreams().Slots.RemoveRange(0, startSeg)

for i := 0; i < startSeg; i++ {
node.InputStreams().Get(i).NotTo(node)
}
node.InputStreams().Slots.RemoveRange(0, startSeg)
node.Segments = lo2.RemoveRange(node.Segments, 0, startSeg)

// StreamRange开始的位置可能在某个分段的中间,此时这个分段的大小等于流开始位置到分段结束位置的距离
@@ -853,7 +867,7 @@ func removeUnusedFromNode(ctx *ParseContext) {
return true
}

if node.Output().Var() == nil {
if node.Output().Var().Dst.Len() == 0 {
ctx.DAG.RemoveNode(node)
}
return true
@@ -887,11 +901,19 @@ func useMultipartUploadToShardStore(ctx *ParseContext) {
return true
}

// Join的目的地必须支持TempStore和MultipartUpload功能才能替换成分片上传
if utils.FindFeature[*cdssdk.TempStore](shardNode.Storage) == nil {
// SegmentJoin的输出流的范围必须与ToShardStore的输入流的范围相同,
// 虽然可以通过调整SegmentJoin的输入流来调整范围,但太复杂,暂不支持
toStrIdx := shardNode.GetTo().GetStreamIndex()
toStrRng := shardNode.GetTo().GetRange()
if toStrIdx.IsRaw() {
if !toStrRng.Equals(ctx.StreamRange) {
return true
}
} else {
return true
}

// Join的目的地必须支持MultipartUpload功能才能替换成分片上传
multiUpload := utils.FindFeature[*cdssdk.MultipartUploadFeature](shardNode.Storage)
if multiUpload == nil {
return true
@@ -905,13 +927,12 @@ func useMultipartUploadToShardStore(ctx *ParseContext) {
}
}

initNode := ctx.DAG.NewMultipartInitiator(shardNode.Storage.Storage.StorageID)
initNode := ctx.DAG.NewMultipartInitiator(shardNode.Storage)
initNode.Env().CopyFrom(shardNode.Env())

partNumber := 1
for i, size := range joinNode.Segments {
joinInput := joinNode.InputSlot(i)
joinInput.Var().NotTo(joinNode)

if size > multiUpload.MaxPartSize {
// 如果一个分段的大小大于最大分片大小,则需要拆分为多个小段上传
@@ -944,9 +965,11 @@ func useMultipartUploadToShardStore(ctx *ParseContext) {

partNumber++
}

joinInput.Var().NotTo(joinNode)
}

bypassNode := ctx.DAG.NewBypassToShardStore(shardNode.Storage.Storage.StorageID)
bypassNode := ctx.DAG.NewBypassToShardStore(shardNode.Storage.Storage.StorageID, shardNode.FileHashStoreKey)
bypassNode.Env().CopyFrom(shardNode.Env())

// 分片上传Node产生的结果送到bypassNode,bypassNode将处理结果再送回分片上传Node
@@ -956,12 +979,15 @@ func useMultipartUploadToShardStore(ctx *ParseContext) {
// 最后删除Join指令和ToShardStore指令
ctx.DAG.RemoveNode(joinNode)
ctx.DAG.RemoveNode(shardNode)
// 因为ToShardStore已经被替换,所以对应的To也要删除。
// 虽然会跳过后续的Range过程,但由于之前做的流范围判断,不加Range也可以
ctx.Ft.Toes = lo2.Remove(ctx.Ft.Toes, shardNode.GetTo())
return true
})
}

// 为IPFS写入指令存储结果
func storeIPFSWriteResult(ctx *ParseContext) {
func storeShardWriteResult(ctx *ParseContext) {
dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool {
if n.FileHashStoreKey == "" {
return true
@@ -973,6 +999,18 @@ func storeIPFSWriteResult(ctx *ParseContext) {
storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
return true
})

dag.WalkOnlyType[*ops2.BypassToShardStoreNode](ctx.DAG.Graph, func(n *ops2.BypassToShardStoreNode) bool {
if n.FileHashStoreKey == "" {
return true
}

storeNode := ctx.DAG.NewStore()
storeNode.Env().ToEnvDriver()

storeNode.Store(n.FileHashStoreKey, n.FileHashVar())
return true
})
}

// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回


+ 16
- 27
common/pkgs/storage/factory/factory.go View File

@@ -1,40 +1,23 @@
package factory

import (
"fmt"
"reflect"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

// 创建一个在MasterHub上长期运行的存储服务
type StorageServiceBuilder func(detail stgmod.StorageDetail) (types.StorageService, error)

// 根据存储服务信息创建一个指定类型的组件
type StorageComponentBuilder func(detail stgmod.StorageDetail, typ reflect.Type) (any, error)

type StorageBuilder struct {
CreateService StorageServiceBuilder
CreateComponent StorageComponentBuilder
}

var storageBuilders = make(map[reflect.Type]StorageBuilder)

// 注册针对指定存储服务类型的Builder
func RegisterBuilder[T cdssdk.StorageType](createSvc StorageServiceBuilder, createComp StorageComponentBuilder) {
storageBuilders[reflect2.TypeOf[T]()] = StorageBuilder{
CreateService: createSvc,
CreateComponent: createComp,
}
}
// 需要导入所有存储服务的包
_ "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local"
)

func CreateService(detail stgmod.StorageDetail) (types.StorageService, error) {
typ := reflect.TypeOf(detail.Storage.Type)
bld, ok := storageBuilders[typ]
bld, ok := reg.StorageBuilders[typ]
if !ok {
return nil, types.ErrUnsupportedStorageType
return nil, fmt.Errorf("unsupported storage type: %T", detail.Storage.Type)
}

return bld.CreateService(detail)
@@ -42,10 +25,10 @@ func CreateService(detail stgmod.StorageDetail) (types.StorageService, error) {

func CreateComponent[T any](detail stgmod.StorageDetail) (T, error) {
typ := reflect.TypeOf(detail.Storage.Type)
bld, ok := storageBuilders[typ]
bld, ok := reg.StorageBuilders[typ]
if !ok {
var def T
return def, types.ErrUnsupportedStorageType
return def, fmt.Errorf("unsupported storage type: %T", detail.Storage.Type)
}

comp, err := bld.CreateComponent(detail, reflect2.TypeOf[T]())
@@ -54,5 +37,11 @@ func CreateComponent[T any](detail stgmod.StorageDetail) (T, error) {
return def, err
}

return comp.(T), nil
c, ok := comp.(T)
if !ok {
var def T
return def, fmt.Errorf("invalid component type: %T", comp)
}

return c, nil
}

+ 19
- 0
common/pkgs/storage/factory/reg/reg.go View File

@@ -0,0 +1,19 @@
package reg

import (
"reflect"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

var StorageBuilders = make(map[reflect.Type]types.StorageBuilder)

// 注册针对指定存储服务类型的Builder
func RegisterBuilder[T cdssdk.StorageType](createSvc types.StorageServiceBuilder, createComp types.StorageComponentBuilder) {
StorageBuilders[reflect2.TypeOf[T]()] = types.StorageBuilder{
CreateService: createSvc,
CreateComponent: createComp,
}
}

+ 31
- 3
common/pkgs/storage/local/local.go View File

@@ -2,16 +2,19 @@ package local

import (
"fmt"
"path/filepath"
"reflect"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/factory/reg"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils"
)

func init() {
factory.RegisterBuilder[*cdssdk.LocalStorageType](createService, createComponent)
reg.RegisterBuilder[*cdssdk.LocalStorageType](createService, createComponent)
}

func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
@@ -49,5 +52,30 @@ func createService(detail stgmod.StorageDetail) (types.StorageService, error) {
}

func createComponent(detail stgmod.StorageDetail, typ reflect.Type) (any, error) {
return nil, types.ErrUnsupportedComponent
switch typ {
case reflect2.TypeOf[types.MultipartInitiator]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

absTempDir, err := filepath.Abs(feat.TempDir)
if err != nil {
return nil, fmt.Errorf("get abs temp dir %v: %v", feat.TempDir, err)
}

return &MultipartInitiator{
absTempDir: absTempDir,
}, nil

case reflect2.TypeOf[types.MultipartUploader]():
feat := utils.FindFeature[*cdssdk.MultipartUploadFeature](detail)
if feat == nil {
return nil, fmt.Errorf("feature %T not found", cdssdk.MultipartUploadFeature{})
}

return &MultipartUploader{}, nil
}

return nil, fmt.Errorf("unsupported component type %v", typ)
}

+ 136
- 0
common/pkgs/storage/local/multipart_upload.go View File

@@ -0,0 +1,136 @@
package local

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"os"
"path/filepath"
"strings"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/os2"
"gitlink.org.cn/cloudream/common/utils/sort2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type MultipartInitiator struct {
absTempDir string // 应该要是绝对路径
tempFileName string
tempPartsDir string
joinedFilePath string
}

func (i *MultipartInitiator) Initiate(ctx context.Context) (types.MultipartInitState, error) {
i.tempFileName = os2.GenerateRandomFileName(10)
i.tempPartsDir = filepath.Join(i.absTempDir, i.tempFileName)
i.joinedFilePath = filepath.Join(i.absTempDir, i.tempFileName+".joined")

err := os.MkdirAll(i.tempPartsDir, 0777)

if err != nil {
return types.MultipartInitState{}, err
}

return types.MultipartInitState{
UploadID: i.tempPartsDir,
}, nil
}

func (i *MultipartInitiator) JoinParts(ctx context.Context, parts []types.UploadedPartInfo) (types.BypassFileInfo, error) {
parts = sort2.Sort(parts, func(l, r types.UploadedPartInfo) int {
return l.PartNumber - r.PartNumber
})

joined, err := os.Create(i.joinedFilePath)
if err != nil {
return types.BypassFileInfo{}, err
}
defer joined.Close()

size := int64(0)
hasher := sha256.New()

for _, part := range parts {
partSize, err := i.writePart(part, joined, hasher)
if err != nil {
return types.BypassFileInfo{}, err
}

size += partSize
}

h := hasher.Sum(nil)
return types.BypassFileInfo{
TempFilePath: joined.Name(),
Size: size,
FileHash: cdssdk.FileHash(strings.ToUpper(hex.EncodeToString(h))),
}, nil
}

func (i *MultipartInitiator) writePart(partInfo types.UploadedPartInfo, joined *os.File, hasher hash.Hash) (int64, error) {
part, err := os.Open(partInfo.ETag)
if err != nil {
return 0, err
}
defer part.Close()

buf := make([]byte, 32*1024)
size := int64(0)
for {
n, err := part.Read(buf)
if n > 0 {
size += int64(n)
io2.WriteAll(hasher, buf[:n])
err := io2.WriteAll(joined, buf[:n])
if err != nil {
return 0, err
}
}
if err == io.EOF {
break
}
if err != nil {
return 0, err
}
}

return size, nil
}

func (i *MultipartInitiator) Complete() {
i.Abort()
}

func (i *MultipartInitiator) Abort() {
os.Remove(i.joinedFilePath)
os.RemoveAll(i.tempPartsDir)
}

type MultipartUploader struct{}

func (u *MultipartUploader) UploadPart(ctx context.Context, init types.MultipartInitState, partSize int64, partNumber int, stream io.Reader) (types.UploadedPartInfo, error) {
partFilePath := filepath.Join(init.UploadID, fmt.Sprintf("%v", partNumber))
partFile, err := os.Create(partFilePath)
if err != nil {
return types.UploadedPartInfo{}, err
}
defer partFile.Close()

_, err = io.Copy(partFile, stream)
if err != nil {
return types.UploadedPartInfo{}, err
}
return types.UploadedPartInfo{
ETag: partFilePath,
PartNumber: partNumber,
}, nil
}

func (u *MultipartUploader) Close() {

}

+ 51
- 8
common/pkgs/storage/local/shard_store.go View File

@@ -27,22 +27,29 @@ const (
type ShardStore struct {
svc *Service
cfg cdssdk.LocalShardStorage
absRoot string
lock sync.Mutex
workingTempFiles map[string]bool
done chan any
}

func NewShardStore(svc *Service, cfg cdssdk.LocalShardStorage) (*ShardStore, error) {
absRoot, err := filepath.Abs(cfg.Root)
if err != nil {
return nil, fmt.Errorf("get abs root: %w", err)
}

return &ShardStore{
svc: svc,
cfg: cfg,
absRoot: absRoot,
workingTempFiles: make(map[string]bool),
done: make(chan any, 1),
}, nil
}

func (s *ShardStore) Start(ch *types.StorageEventChan) {
s.getLogger().Infof("component start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize)
s.getLogger().Infof("component start, root: %v, max size: %v", s.absRoot, s.cfg.MaxSize)

go func() {
removeTempTicker := time.NewTicker(time.Minute * 10)
@@ -65,7 +72,7 @@ func (s *ShardStore) removeUnusedTempFiles() {

log := s.getLogger()

entries, err := os.ReadDir(filepath.Join(s.cfg.Root, TempDir))
entries, err := os.ReadDir(filepath.Join(s.absRoot, TempDir))
if err != nil {
log.Warnf("read temp dir: %v", err)
return
@@ -86,7 +93,7 @@ func (s *ShardStore) removeUnusedTempFiles() {
continue
}

path := filepath.Join(s.cfg.Root, TempDir, entry.Name())
path := filepath.Join(s.absRoot, TempDir, entry.Name())
err = os.Remove(path)
if err != nil {
log.Warnf("remove temp file %v: %v", path, err)
@@ -125,7 +132,7 @@ func (s *ShardStore) createTempFile() (*os.File, error) {
s.lock.Lock()
defer s.lock.Unlock()

tmpDir := filepath.Join(s.cfg.Root, TempDir)
tmpDir := filepath.Join(s.absRoot, TempDir)

err := os.MkdirAll(tmpDir, 0755)
if err != nil {
@@ -285,7 +292,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {

var infos []types.FileInfo

blockDir := filepath.Join(s.cfg.Root, BlocksDir)
blockDir := filepath.Join(s.absRoot, BlocksDir)
err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
@@ -326,7 +333,7 @@ func (s *ShardStore) GC(avaiables []cdssdk.FileHash) error {

cnt := 0

blockDir := filepath.Join(s.cfg.Root, BlocksDir)
blockDir := filepath.Join(s.absRoot, BlocksDir)
err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
@@ -370,14 +377,50 @@ func (s *ShardStore) Stats() types.Stats {
}
}

func (s *ShardStore) BypassUploaded(info types.BypassFileInfo) error {
if info.FileHash == "" {
return fmt.Errorf("empty file hash is not allowed by this shard store")
}

s.lock.Lock()
defer s.lock.Unlock()

log := s.getLogger()

log.Debugf("%v bypass uploaded, size: %v, hash: %v", info.TempFilePath, info.Size, info.FileHash)

blockDir := s.getFileDirFromHash(info.FileHash)
err := os.MkdirAll(blockDir, 0755)
if err != nil {
log.Warnf("make block dir %v: %v", blockDir, err)
return fmt.Errorf("making block dir: %w", err)
}

newPath := filepath.Join(blockDir, string(info.FileHash))
_, err = os.Stat(newPath)
if os.IsNotExist(err) {
err = os.Rename(info.TempFilePath, newPath)
if err != nil {
log.Warnf("rename %v to %v: %v", info.TempFilePath, newPath, err)
return fmt.Errorf("rename file: %w", err)
}

} else if err != nil {
log.Warnf("get file %v stat: %v", newPath, err)
return fmt.Errorf("get file stat: %w", err)
}

return nil
}

func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("ShardStore", "Local").WithField("Storage", s.svc.Detail.Storage.String())
}

func (s *ShardStore) getFileDirFromHash(hash cdssdk.FileHash) string {
return filepath.Join(s.cfg.Root, BlocksDir, string(hash)[:2])
return filepath.Join(s.absRoot, BlocksDir, string(hash)[:2])
}

func (s *ShardStore) getFilePathFromHash(hash cdssdk.FileHash) string {
return filepath.Join(s.cfg.Root, BlocksDir, string(hash)[:2], string(hash))
return filepath.Join(s.absRoot, BlocksDir, string(hash)[:2], string(hash))
}

+ 1
- 0
common/pkgs/storage/obs/obs.go View File

@@ -0,0 +1 @@
package obs

+ 1
- 0
common/pkgs/storage/s3/shard_store.go View File

@@ -0,0 +1 @@
package s3

+ 1
- 4
common/pkgs/storage/types/bypass.go View File

@@ -4,13 +4,10 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

// type BypassWriter interface {
// Write(stream io.Reader) (string, error)
// }

type BypassFileInfo struct {
TempFilePath string
FileHash cdssdk.FileHash
Size int64
}

type BypassNotifier interface {


+ 8
- 10
common/pkgs/storage/types/s3_client.go View File

@@ -3,19 +3,21 @@ package types
import (
"context"
"io"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type MultipartInitiator interface {
MultipartUploader
Initiate(ctx context.Context, objectName string) (MultipartInitState, error)
Complete(ctx context.Context, parts []UploadedPartInfo) (CompletedFileInfo, error)
// 启动一个分片上传
Initiate(ctx context.Context) (MultipartInitState, error)
// 所有分片上传完成后,合并分片
JoinParts(ctx context.Context, parts []UploadedPartInfo) (BypassFileInfo, error)
// 合成之后的文件已被使用
Complete()
// 取消上传。如果在调用Complete之前调用,则应该删除合并后的文件。如果已经调用Complete,则应该不做任何事情。
Abort()
}

type MultipartUploader interface {
UploadPart(ctx context.Context, init MultipartInitState, objectName string, partSize int64, partNumber int, stream io.Reader) (UploadedPartInfo, error)
UploadPart(ctx context.Context, init MultipartInitState, partSize int64, partNumber int, stream io.Reader) (UploadedPartInfo, error)
Close()
}

@@ -27,7 +29,3 @@ type UploadedPartInfo struct {
PartNumber int
ETag string
}

type CompletedFileInfo struct {
FileHash cdssdk.FileHash // 可以为空,为空代表获取不到FileHash值
}

+ 11
- 4
common/pkgs/storage/types/types.go View File

@@ -14,10 +14,6 @@ var ErrComponentNotFound = errors.New("component not found")

var ErrStorageExists = errors.New("storage already exists")

var ErrUnsupportedStorageType = errors.New("unsupported storage type")

var ErrUnsupportedComponent = errors.New("unsupported component type")

type StorageEvent interface{}

type StorageEventChan = async.UnboundChannel[StorageEvent]
@@ -29,3 +25,14 @@ type StorageService interface {
Start(ch *StorageEventChan)
Stop()
}

// 创建一个在MasterHub上长期运行的存储服务
type StorageServiceBuilder func(detail stgmod.StorageDetail) (StorageService, error)

// 根据存储服务信息创建一个指定类型的组件
type StorageComponentBuilder func(detail stgmod.StorageDetail, typ reflect.Type) (any, error)

type StorageBuilder struct {
CreateService StorageServiceBuilder
CreateComponent StorageComponentBuilder
}

Loading…
Cancel
Save