From 3fe245a05a09f784fe0081982bca46f964279b4e Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 31 Oct 2024 15:32:12 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8C=89=E5=AD=98=E5=82=A8=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E5=88=92=E5=88=86=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/task/storage_load_package.go | 2 +- common/pkgs/ioswitch2/ops2/shard_store.go | 2 +- common/pkgs/ioswitch2/parser/parser.go | 2 +- common/pkgs/ioswitchlrc/ops2/shard_store.go | 2 +- common/pkgs/ioswitchlrc/parser/passes.go | 2 +- .../local/local.go => local/shard_store.go} | 35 ++++++----- .../{temp/local.go => local/temp_store.go} | 2 +- .../{shard/storages => }/local/writer.go | 12 ++-- common/pkgs/storage/mgr/create_shardstore.go | 4 +- common/pkgs/storage/mgr/mgr.go | 20 +++---- .../storage/shard/storages/utils/utils.go | 26 -------- common/pkgs/storage/shard/types/option.go | 58 ------------------ common/pkgs/storage/shared/shared.go | 7 --- common/pkgs/storage/temp/temp_store.go | 4 -- .../shardstore.go => types/shard_store.go} | 59 +++++++++++++++++-- common/pkgs/storage/types/shared_store.go | 5 ++ common/pkgs/storage/types/temp_store.go | 5 ++ common/pkgs/storage/utils/utils.go | 26 ++++++++ 18 files changed, 131 insertions(+), 142 deletions(-) rename common/pkgs/storage/{shard/storages/local/local.go => local/shard_store.go} (77%) rename common/pkgs/storage/{temp/local.go => local/temp_store.go} (94%) rename common/pkgs/storage/{shard/storages => }/local/writer.go (78%) delete mode 100644 common/pkgs/storage/shard/storages/utils/utils.go delete mode 100644 common/pkgs/storage/shard/types/option.go delete mode 100644 common/pkgs/storage/shared/shared.go delete mode 100644 common/pkgs/storage/temp/temp_store.go rename common/pkgs/storage/{shard/types/shardstore.go => types/shard_store.go} (58%) create mode 100644 common/pkgs/storage/types/shared_store.go create mode 100644 common/pkgs/storage/types/temp_store.go create mode 100644 common/pkgs/storage/utils/utils.go diff --git a/agent/internal/task/storage_load_package.go b/agent/internal/task/storage_load_package.go index f65bba0..2911444 100644 --- a/agent/internal/task/storage_load_package.go +++ b/agent/internal/task/storage_load_package.go @@ -22,7 +22,7 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" "gitlink.org.cn/cloudream/storage/common/pkgs/ec" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" "gitlink.org.cn/cloudream/storage/common/utils" ) diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 9e1a8cf..610e325 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -11,7 +11,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) func init() { diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index b7cdc1d..8310d83 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -12,7 +12,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) type DefaultParser struct { diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index 3dcb2d5..f21bebb 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -11,7 +11,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) func init() { diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index d202a7d..31172c0 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) // 计算输入流的打开范围。会把流的范围按条带大小取整 diff --git a/common/pkgs/storage/shard/storages/local/local.go b/common/pkgs/storage/local/shard_store.go similarity index 77% rename from common/pkgs/storage/shard/storages/local/local.go rename to common/pkgs/storage/local/shard_store.go index bf7bfb8..eb2f7e3 100644 --- a/common/pkgs/storage/shard/storages/local/local.go +++ b/common/pkgs/storage/local/shard_store.go @@ -11,9 +11,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/utils" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" - stypes "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/utils" ) const ( @@ -21,35 +20,35 @@ const ( BlocksDir = "blocks" ) -type Local struct { +type ShardStore struct { cfg cdssdk.LocalShardStorage } -func New(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*Local, error) { +func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { _, ok := stg.Address.(*cdssdk.LocalStorageAddress) if !ok { return nil, fmt.Errorf("storage address(%T) is not local", stg) } - return &Local{ + return &ShardStore{ cfg: cfg, }, nil } -func (s *Local) Start(ch *stypes.StorageEventChan) { +func (s *ShardStore) Start(ch *types.StorageEventChan) { } -func (s *Local) Stop() { +func (s *ShardStore) Stop() { } -func (s *Local) New() types.Writer { +func (s *ShardStore) New() types.ShardWriter { file, err := os.CreateTemp(filepath.Join(s.cfg.Root, "tmp"), "tmp-*") if err != nil { - return utils.ErrorWriter(err) + return utils.ErrorShardWriter(err) } - return &Writer{ + return &ShardWriter{ path: filepath.Join(s.cfg.Root, "tmp", file.Name()), file: file, hasher: sha256.New(), @@ -58,7 +57,7 @@ func (s *Local) New() types.Writer { } // 使用F函数创建Option对象 -func (s *Local) Open(opt types.OpenOption) (io.ReadCloser, error) { +func (s *ShardStore) Open(opt types.OpenOption) (io.ReadCloser, error) { fileName := string(opt.FileHash) if len(fileName) < 2 { return nil, fmt.Errorf("invalid file name") @@ -85,7 +84,7 @@ func (s *Local) Open(opt types.OpenOption) (io.ReadCloser, error) { return file, nil } -func (s *Local) ListAll() ([]types.FileInfo, error) { +func (s *ShardStore) ListAll() ([]types.FileInfo, error) { var infos []types.FileInfo blockDir := filepath.Join(s.cfg.Root, BlocksDir) @@ -115,7 +114,7 @@ func (s *Local) ListAll() ([]types.FileInfo, error) { return infos, nil } -func (s *Local) Purge(removes []cdssdk.FileHash) error { +func (s *ShardStore) Purge(removes []cdssdk.FileHash) error { for _, hash := range removes { fileName := string(hash) @@ -130,19 +129,19 @@ func (s *Local) Purge(removes []cdssdk.FileHash) error { return nil } -func (s *Local) Stats() types.Stats { +func (s *ShardStore) Stats() types.Stats { // TODO 统计本地存储的相关信息 return types.Stats{ Status: types.StatusOK, } } -func (s *Local) onWritterAbort(w *Writer) { +func (s *ShardStore) onWritterAbort(w *ShardWriter) { logger.Debugf("writting file %v aborted", w.path) s.removeTempFile(w.path) } -func (s *Local) onWritterFinish(w *Writer, hash cdssdk.FileHash) (types.FileInfo, error) { +func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { logger.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash) blockDir := filepath.Join(s.cfg.Root, BlocksDir, string(hash)[:2]) @@ -168,7 +167,7 @@ func (s *Local) onWritterFinish(w *Writer, hash cdssdk.FileHash) (types.FileInfo }, nil } -func (s *Local) removeTempFile(path string) { +func (s *ShardStore) removeTempFile(path string) { err := os.Remove(path) if err != nil { logger.Warnf("removing temp file %v: %v", path, err) diff --git a/common/pkgs/storage/temp/local.go b/common/pkgs/storage/local/temp_store.go similarity index 94% rename from common/pkgs/storage/temp/local.go rename to common/pkgs/storage/local/temp_store.go index 91de4ee..843c66f 100644 --- a/common/pkgs/storage/temp/local.go +++ b/common/pkgs/storage/local/temp_store.go @@ -1,4 +1,4 @@ -package tempstore +package local import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" diff --git a/common/pkgs/storage/shard/storages/local/writer.go b/common/pkgs/storage/local/writer.go similarity index 78% rename from common/pkgs/storage/shard/storages/local/writer.go rename to common/pkgs/storage/local/writer.go index df9b9ef..8994ae8 100644 --- a/common/pkgs/storage/shard/storages/local/writer.go +++ b/common/pkgs/storage/local/writer.go @@ -8,19 +8,19 @@ import ( "strings" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) -type Writer struct { +type ShardWriter struct { path string file *os.File hasher hash.Hash size int64 closed bool - owner *Local + owner *ShardStore } -func (w *Writer) Write(data []byte) (int, error) { +func (w *ShardWriter) Write(data []byte) (int, error) { n, err := w.file.Write(data) if err != nil { return 0, err @@ -32,7 +32,7 @@ func (w *Writer) Write(data []byte) (int, error) { } // 取消写入 -func (w *Writer) Abort() error { +func (w *ShardWriter) Abort() error { if w.closed { return nil } @@ -44,7 +44,7 @@ func (w *Writer) Abort() error { } // 结束写入,获得文件哈希值 -func (w *Writer) Finish() (types.FileInfo, error) { +func (w *ShardWriter) Finish() (types.FileInfo, error) { if w.closed { return types.FileInfo{}, fmt.Errorf("stream closed") } diff --git a/common/pkgs/storage/mgr/create_shardstore.go b/common/pkgs/storage/mgr/create_shardstore.go index 14879eb..ba11ae5 100644 --- a/common/pkgs/storage/mgr/create_shardstore.go +++ b/common/pkgs/storage/mgr/create_shardstore.go @@ -5,14 +5,14 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/local" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/local" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) func createShardStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error { switch confg := detail.Shard.Config.(type) { case *cdssdk.LocalShardStorage: - store, err := local.New(detail.Storage, *confg) + store, err := local.NewShardStore(detail.Storage, *confg) if err != nil { return fmt.Errorf("new local shard store: %v", err) } diff --git a/common/pkgs/storage/mgr/mgr.go b/common/pkgs/storage/mgr/mgr.go index 3e114c5..95b2e8b 100644 --- a/common/pkgs/storage/mgr/mgr.go +++ b/common/pkgs/storage/mgr/mgr.go @@ -9,9 +9,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/reflect2" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shared" - stypes "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) var ErrStorageNotFound = errors.New("storage not found") @@ -22,20 +20,20 @@ var ErrStorageExists = errors.New("storage already exists") type storage struct { Shard types.ShardStore - Shared shared.SharedStore - Components []stypes.StorageComponent + Shared types.SharedStore + Components []types.StorageComponent } type Manager struct { storages map[cdssdk.StorageID]*storage lock sync.Mutex - eventChan *stypes.StorageEventChan + eventChan *types.StorageEventChan } func NewManager() *Manager { return &Manager{ storages: make(map[cdssdk.StorageID]*storage), - eventChan: async.NewUnboundChannel[stypes.StorageEvent](), + eventChan: async.NewUnboundChannel[types.StorageEvent](), } } @@ -108,7 +106,7 @@ func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error } // 查找指定Storage的SharedStore组件 -func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (shared.SharedStore, error) { +func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (types.SharedStore, error) { m.lock.Lock() defer m.lock.Unlock() @@ -125,7 +123,7 @@ func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (shared.SharedStore, er } // 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件 -func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (stypes.StorageComponent, error) { +func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (types.StorageComponent, error) { m.lock.Lock() defer m.lock.Unlock() @@ -141,7 +139,7 @@ func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (stypes } return stg.Shard, nil - case reflect2.TypeOf[shared.SharedStore](): + case reflect2.TypeOf[types.SharedStore](): if stg.Shared == nil { return nil, ErrComponentNotFound } @@ -158,7 +156,7 @@ func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (stypes } } -func GetComponent[T stypes.StorageComponent](mgr *Manager, stgID cdssdk.StorageID) (T, error) { +func GetComponent[T types.StorageComponent](mgr *Manager, stgID cdssdk.StorageID) (T, error) { ret, err := mgr.GetComponent(stgID, reflect2.TypeOf[T]()) if err != nil { var def T diff --git a/common/pkgs/storage/shard/storages/utils/utils.go b/common/pkgs/storage/shard/storages/utils/utils.go deleted file mode 100644 index f2a3d91..0000000 --- a/common/pkgs/storage/shard/storages/utils/utils.go +++ /dev/null @@ -1,26 +0,0 @@ -package utils - -import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types" - -type errorWriter struct { - err error -} - -func (w *errorWriter) Write(data []byte) (int, error) { - return 0, w.err -} - -// 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。 -// 方便defer机制 -func (w *errorWriter) Abort() error { - return w.err -} - -// 结束写入,获得文件哈希值 -func (w *errorWriter) Finish() (types.FileInfo, error) { - return types.FileInfo{}, w.err -} - -func ErrorWriter(err error) types.Writer { - return &errorWriter{err: err} -} diff --git a/common/pkgs/storage/shard/types/option.go b/common/pkgs/storage/shard/types/option.go deleted file mode 100644 index 1d15f0b..0000000 --- a/common/pkgs/storage/shard/types/option.go +++ /dev/null @@ -1,58 +0,0 @@ -package types - -import ( - "fmt" - - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -type OpenOption struct { - FileHash cdssdk.FileHash - Offset int64 - Length int64 -} - -func NewOpen(fileHash cdssdk.FileHash) OpenOption { - return OpenOption{ - FileHash: fileHash, - Offset: 0, - Length: -1, - } -} - -func (o *OpenOption) WithLength(len int64) OpenOption { - o.Length = len - return *o -} - -// [start, end],即包含end -func (o *OpenOption) WithRange(start int64, end int64) OpenOption { - o.Offset = start - o.Length = end - start + 1 - return *o -} - -func (o *OpenOption) WithNullableLength(offset int64, length *int64) { - o.Offset = offset - if length != nil { - o.Length = *length - } -} - -func (o *OpenOption) String() string { - rangeStart := "" - if o.Offset > 0 { - rangeStart = fmt.Sprintf("%d", o.Offset) - } - - rangeEnd := "" - if o.Length >= 0 { - rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length-1) - } - - if rangeStart == "" && rangeEnd == "" { - return string(o.FileHash) - } - - return fmt.Sprintf("%s[%s:%s]", string(o.FileHash), rangeStart, rangeEnd) -} diff --git a/common/pkgs/storage/shared/shared.go b/common/pkgs/storage/shared/shared.go deleted file mode 100644 index cb7b2e7..0000000 --- a/common/pkgs/storage/shared/shared.go +++ /dev/null @@ -1,7 +0,0 @@ -package shared - -import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" - -type SharedStore interface { - types.StorageComponent -} diff --git a/common/pkgs/storage/temp/temp_store.go b/common/pkgs/storage/temp/temp_store.go deleted file mode 100644 index eea2fdc..0000000 --- a/common/pkgs/storage/temp/temp_store.go +++ /dev/null @@ -1,4 +0,0 @@ -package tempstore - -type TempStore interface { -} diff --git a/common/pkgs/storage/shard/types/shardstore.go b/common/pkgs/storage/types/shard_store.go similarity index 58% rename from common/pkgs/storage/shard/types/shardstore.go rename to common/pkgs/storage/types/shard_store.go index 701da6d..a80ad01 100644 --- a/common/pkgs/storage/shard/types/shardstore.go +++ b/common/pkgs/storage/types/shard_store.go @@ -1,10 +1,10 @@ package types import ( + "fmt" "io" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" ) type Status interface { @@ -23,9 +23,9 @@ type StoreEvent interface { } type ShardStore interface { - types.StorageComponent + StorageComponent // 准备写入一个新文件,写入后获得FileHash - New() Writer + New() ShardWriter // 使用F函数创建Option对象 Open(opt OpenOption) (io.ReadCloser, error) // 获取所有文件信息,尽量保证操作是原子的 @@ -61,7 +61,7 @@ type Stats struct { Description string } -type Writer interface { +type ShardWriter interface { io.Writer // 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。 // 方便defer机制 @@ -69,3 +69,54 @@ type Writer interface { // 结束写入,获得文件哈希值 Finish() (FileInfo, error) } + +type OpenOption struct { + FileHash cdssdk.FileHash + Offset int64 + Length int64 +} + +func NewOpen(fileHash cdssdk.FileHash) OpenOption { + return OpenOption{ + FileHash: fileHash, + Offset: 0, + Length: -1, + } +} + +func (o *OpenOption) WithLength(len int64) OpenOption { + o.Length = len + return *o +} + +// [start, end],即包含end +func (o *OpenOption) WithRange(start int64, end int64) OpenOption { + o.Offset = start + o.Length = end - start + 1 + return *o +} + +func (o *OpenOption) WithNullableLength(offset int64, length *int64) { + o.Offset = offset + if length != nil { + o.Length = *length + } +} + +func (o *OpenOption) String() string { + rangeStart := "" + if o.Offset > 0 { + rangeStart = fmt.Sprintf("%d", o.Offset) + } + + rangeEnd := "" + if o.Length >= 0 { + rangeEnd = fmt.Sprintf("%d", o.Offset+o.Length-1) + } + + if rangeStart == "" && rangeEnd == "" { + return string(o.FileHash) + } + + return fmt.Sprintf("%s[%s:%s]", string(o.FileHash), rangeStart, rangeEnd) +} diff --git a/common/pkgs/storage/types/shared_store.go b/common/pkgs/storage/types/shared_store.go new file mode 100644 index 0000000..bab3f34 --- /dev/null +++ b/common/pkgs/storage/types/shared_store.go @@ -0,0 +1,5 @@ +package types + +type SharedStore interface { + StorageComponent +} diff --git a/common/pkgs/storage/types/temp_store.go b/common/pkgs/storage/types/temp_store.go new file mode 100644 index 0000000..d6dd620 --- /dev/null +++ b/common/pkgs/storage/types/temp_store.go @@ -0,0 +1,5 @@ +package types + +type TempStore interface { + StorageComponent +} diff --git a/common/pkgs/storage/utils/utils.go b/common/pkgs/storage/utils/utils.go new file mode 100644 index 0000000..f6941b8 --- /dev/null +++ b/common/pkgs/storage/utils/utils.go @@ -0,0 +1,26 @@ +package utils + +import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types" + +type errorShardWriter struct { + err error +} + +func (w *errorShardWriter) Write(data []byte) (int, error) { + return 0, w.err +} + +// 取消写入。要求允许在调用了Finish之后再调用此函数,且此时不应该有任何影响。 +// 方便defer机制 +func (w *errorShardWriter) Abort() error { + return w.err +} + +// 结束写入,获得文件哈希值 +func (w *errorShardWriter) Finish() (types.FileInfo, error) { + return types.FileInfo{}, w.err +} + +func ErrorShardWriter(err error) types.ShardWriter { + return &errorShardWriter{err: err} +}