From 0f2232136ec7b8f0d0bdcd395891813b53f77839 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 22 Oct 2024 17:45:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8ShardStore=E6=9B=BF=E6=8D=A2I?= =?UTF-8?q?PFS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/models/models.go | 6 + common/pkgs/ec/block.go | 354 ++++++++++---------- common/pkgs/ioswitch2/fromto.go | 8 +- common/pkgs/ioswitch2/http_hub_worker.go | 7 +- common/pkgs/ioswitch2/ops2/chunked.go | 15 +- common/pkgs/ioswitch2/ops2/clone.go | 13 +- common/pkgs/ioswitch2/ops2/ec.go | 14 +- common/pkgs/ioswitch2/ops2/file.go | 9 +- common/pkgs/ioswitch2/ops2/ipfs.go | 170 ---------- common/pkgs/ioswitch2/ops2/range.go | 9 +- common/pkgs/ioswitch2/ops2/shard_store.go | 188 +++++++++++ common/pkgs/ioswitch2/parser/parser.go | 37 +- common/pkgs/ioswitchlrc/fromto.go | 8 +- common/pkgs/ioswitchlrc/ops2/chunked.go | 15 +- common/pkgs/ioswitchlrc/ops2/clone.go | 13 +- common/pkgs/ioswitchlrc/ops2/ec.go | 7 +- common/pkgs/ioswitchlrc/ops2/ipfs.go | 170 ---------- common/pkgs/ioswitchlrc/ops2/range.go | 9 +- common/pkgs/ioswitchlrc/ops2/shard_store.go | 188 +++++++++++ common/pkgs/ioswitchlrc/parser/passes.go | 23 +- 20 files changed, 640 insertions(+), 623 deletions(-) delete mode 100644 common/pkgs/ioswitch2/ops2/ipfs.go create mode 100644 common/pkgs/ioswitch2/ops2/shard_store.go delete mode 100644 common/pkgs/ioswitchlrc/ops2/ipfs.go create mode 100644 common/pkgs/ioswitchlrc/ops2/shard_store.go diff --git a/common/models/models.go b/common/models/models.go index 20690af..b11fb3d 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -114,3 +114,9 @@ type ObjectAccessStat struct { Amount float64 `db:"Amount" json:"Amount"` // 前一日的读取量的滑动平均值 Counter float64 `db:"Counter" json:"counter"` // 当日的读取量 } + +type StorageDetail struct { + Storage cdssdk.Storage `json:"storage"` + Shard cdssdk.ShardStorage `json:"shard"` + Shared cdssdk.SharedStorage `json:"shared"` +} diff --git a/common/pkgs/ec/block.go b/common/pkgs/ec/block.go index 7b6d019..34a47af 100644 --- a/common/pkgs/ec/block.go +++ b/common/pkgs/ec/block.go @@ -1,190 +1,190 @@ package ec -import ( - "errors" - "io" - "io/ioutil" +// import ( +// "errors" +// "io" +// "io/ioutil" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - "gitlink.org.cn/cloudream/common/pkgs/logger" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" -) +// "gitlink.org.cn/cloudream/common/pkgs/ipfs" +// "gitlink.org.cn/cloudream/common/pkgs/logger" +// stgglb "gitlink.org.cn/cloudream/storage/common/globals" +// ) -type BlockReader struct { - ipfsCli *ipfs.PoolClient - /*将文件分块相关的属性*/ - //fileHash - fileHash string - //fileSize - fileSize int64 - //ecK将文件的分块数 - ecK int - //chunkSize - chunkSize int64 +// type BlockReader struct { +// ipfsCli *ipfs.PoolClient +// /*将文件分块相关的属性*/ +// //fileHash +// fileHash string +// //fileSize +// fileSize int64 +// //ecK将文件的分块数 +// ecK int +// //chunkSize +// chunkSize int64 - /*可选项*/ - //fastRead,true的时候直接通过hash读block - jumpReadOpt bool -} +// /*可选项*/ +// //fastRead,true的时候直接通过hash读block +// jumpReadOpt bool +// } -func NewBlockReader() (*BlockReader, error) { - ipfsClient, err := stgglb.IPFSPool.Acquire() - if err != nil { - return nil, err - } - //default:fast模式,通过hash直接获取 - return &BlockReader{ipfsCli: ipfsClient, chunkSize: 256 * 1024, jumpReadOpt: false}, nil -} +// func NewBlockReader() (*BlockReader, error) { +// ipfsClient, err := stgglb.IPFSPool.Acquire() +// if err != nil { +// return nil, err +// } +// //default:fast模式,通过hash直接获取 +// return &BlockReader{ipfsCli: ipfsClient, chunkSize: 256 * 1024, jumpReadOpt: false}, nil +// } -func (r *BlockReader) Close() { - r.ipfsCli.Close() -} +// func (r *BlockReader) Close() { +// r.ipfsCli.Close() +// } -func (r *BlockReader) SetJumpRead(fileHash string, fileSize int64, ecK int) { - r.fileHash = fileHash - r.fileSize = fileSize - r.ecK = ecK - r.jumpReadOpt = true -} +// func (r *BlockReader) SetJumpRead(fileHash string, fileSize int64, ecK int) { +// r.fileHash = fileHash +// r.fileSize = fileSize +// r.ecK = ecK +// r.jumpReadOpt = true +// } -func (r *BlockReader) SetchunkSize(size int64) { - r.chunkSize = size -} +// func (r *BlockReader) SetchunkSize(size int64) { +// r.chunkSize = size +// } -func (r *BlockReader) FetchBLock(blockHash string) (io.ReadCloser, error) { - return r.ipfsCli.OpenRead(blockHash) -} +// func (r *BlockReader) FetchBLock(blockHash string) (io.ReadCloser, error) { +// return r.ipfsCli.OpenRead(blockHash) +// } -func (r *BlockReader) FetchBLocks(blockHashs []string) ([]io.ReadCloser, error) { - readers := make([]io.ReadCloser, len(blockHashs)) - for i, hash := range blockHashs { - var err error - readers[i], err = r.ipfsCli.OpenRead(hash) - if err != nil { - return nil, err - } - } - return readers, nil -} +// func (r *BlockReader) FetchBLocks(blockHashs []string) ([]io.ReadCloser, error) { +// readers := make([]io.ReadCloser, len(blockHashs)) +// for i, hash := range blockHashs { +// var err error +// readers[i], err = r.ipfsCli.OpenRead(hash) +// if err != nil { +// return nil, err +// } +// } +// return readers, nil +// } -func (r *BlockReader) JumpFetchBlock(innerID int) (io.ReadCloser, error) { - if !r.jumpReadOpt { - return nil, nil - } - pipeReader, pipeWriter := io.Pipe() - go func() { - for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { - reader, err := r.ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{Offset: i, Length: r.chunkSize}) - if err != nil { - pipeWriter.CloseWithError(err) - return - } - data, err := ioutil.ReadAll(reader) - if err != nil { - pipeWriter.CloseWithError(err) - return - } - reader.Close() - _, err = pipeWriter.Write(data) - if err != nil { - pipeWriter.CloseWithError(err) - return - } - } - //如果文件大小不是分块的整数倍,可能需要补0 - if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { - //pktNum_1:chunkNum-1 - pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) - offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) - count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset - if count0 > 0 { - add0 := make([]byte, count0) - pipeWriter.Write(add0) - } - } - pipeWriter.Close() - }() - return pipeReader, nil -} +// func (r *BlockReader) JumpFetchBlock(innerID int) (io.ReadCloser, error) { +// if !r.jumpReadOpt { +// return nil, nil +// } +// pipeReader, pipeWriter := io.Pipe() +// go func() { +// for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { +// reader, err := r.ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{Offset: i, Length: r.chunkSize}) +// if err != nil { +// pipeWriter.CloseWithError(err) +// return +// } +// data, err := ioutil.ReadAll(reader) +// if err != nil { +// pipeWriter.CloseWithError(err) +// return +// } +// reader.Close() +// _, err = pipeWriter.Write(data) +// if err != nil { +// pipeWriter.CloseWithError(err) +// return +// } +// } +// //如果文件大小不是分块的整数倍,可能需要补0 +// if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { +// //pktNum_1:chunkNum-1 +// pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) +// offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) +// count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset +// if count0 > 0 { +// add0 := make([]byte, count0) +// pipeWriter.Write(add0) +// } +// } +// pipeWriter.Close() +// }() +// return pipeReader, nil +// } -// FetchBlock1这个函数废弃了 -func (r *BlockReader) FetchBlock1(input interface{}, errMsg chan error) (io.ReadCloser, error) { - /*两种模式下传入第一个参数,但是input的类型不同: - jumpReadOpt-》true:传入blcokHash, string型,通过哈希直接读 - jumpReadOpt->false: 传入innerID,int型,选择需要获取的数据块的id - */ - var innerID int - var blockHash string - switch input.(type) { - case int: - // 执行针对整数的逻辑分支 - if r.jumpReadOpt { - return nil, errors.New("conflict, wrong input type and jumpReadOpt:true") - } else { - innerID = input.(int) - } - case string: - if !r.jumpReadOpt { - return nil, errors.New("conflict, wrong input type and jumpReadOpt:false") - } else { - blockHash = input.(string) - } - default: - return nil, errors.New("wrong input type") - } - //开始执行 - if r.jumpReadOpt { //快速读 - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - logger.Warnf("new ipfs client: %s", err.Error()) - return nil, err - } - defer ipfsCli.Close() - return ipfsCli.OpenRead(blockHash) - } else { //跳跃读 - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - logger.Warnf("new ipfs client: %s", err.Error()) - return nil, err - } - defer ipfsCli.Close() - pipeReader, pipeWriter := io.Pipe() - go func() { - for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { - reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{i, r.chunkSize}) - if err != nil { - pipeWriter.Close() - errMsg <- err - return - } - data, err := ioutil.ReadAll(reader) - if err != nil { - pipeWriter.Close() - errMsg <- err - return - } - reader.Close() - _, err = pipeWriter.Write(data) - if err != nil { - pipeWriter.Close() - errMsg <- err - return - } - } - //如果文件大小不是分块的整数倍,可能需要补0 - if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { - //pktNum_1:chunkNum-1 - pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) - offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) - count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset - if count0 > 0 { - add0 := make([]byte, count0) - pipeWriter.Write(add0) - } - } - pipeWriter.Close() - errMsg <- nil - }() - return pipeReader, nil - } -} +// // FetchBlock1这个函数废弃了 +// func (r *BlockReader) FetchBlock1(input interface{}, errMsg chan error) (io.ReadCloser, error) { +// /*两种模式下传入第一个参数,但是input的类型不同: +// jumpReadOpt-》true:传入blcokHash, string型,通过哈希直接读 +// jumpReadOpt->false: 传入innerID,int型,选择需要获取的数据块的id +// */ +// var innerID int +// var blockHash string +// switch input.(type) { +// case int: +// // 执行针对整数的逻辑分支 +// if r.jumpReadOpt { +// return nil, errors.New("conflict, wrong input type and jumpReadOpt:true") +// } else { +// innerID = input.(int) +// } +// case string: +// if !r.jumpReadOpt { +// return nil, errors.New("conflict, wrong input type and jumpReadOpt:false") +// } else { +// blockHash = input.(string) +// } +// default: +// return nil, errors.New("wrong input type") +// } +// //开始执行 +// if r.jumpReadOpt { //快速读 +// ipfsCli, err := stgglb.IPFSPool.Acquire() +// if err != nil { +// logger.Warnf("new ipfs client: %s", err.Error()) +// return nil, err +// } +// defer ipfsCli.Close() +// return ipfsCli.OpenRead(blockHash) +// } else { //跳跃读 +// ipfsCli, err := stgglb.IPFSPool.Acquire() +// if err != nil { +// logger.Warnf("new ipfs client: %s", err.Error()) +// return nil, err +// } +// defer ipfsCli.Close() +// pipeReader, pipeWriter := io.Pipe() +// go func() { +// for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { +// reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{i, r.chunkSize}) +// if err != nil { +// pipeWriter.Close() +// errMsg <- err +// return +// } +// data, err := ioutil.ReadAll(reader) +// if err != nil { +// pipeWriter.Close() +// errMsg <- err +// return +// } +// reader.Close() +// _, err = pipeWriter.Write(data) +// if err != nil { +// pipeWriter.Close() +// errMsg <- err +// return +// } +// } +// //如果文件大小不是分块的整数倍,可能需要补0 +// if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { +// //pktNum_1:chunkNum-1 +// pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) +// offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) +// count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset +// if count0 > 0 { +// add0 := make([]byte, count0) +// pipeWriter.Write(add0) +// } +// } +// pipeWriter.Close() +// errMsg <- nil +// }() +// return pipeReader, nil +// } +// } diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index dffb742..36d5cb1 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -3,6 +3,7 @@ package ioswitch2 import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" ) type From interface { @@ -58,12 +59,13 @@ func (f *FromDriver) GetDataIndex() int { } type FromNode struct { - FileHash string - Node *cdssdk.Node + FileHash types.FileHash + Node cdssdk.Node + Storage cdssdk.Storage DataIndex int } -func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode { +func NewFromNode(fileHash types.FileHash, node cdssdk.Node, storage cdssdk.Storage, dataIndex int) *FromNode { return &FromNode{ FileHash: fileHash, Node: node, diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index e121779..c8658d5 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -22,10 +23,10 @@ type HttpHubWorker struct { func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { addressInfo := w.Node.Address.(*cdssdk.HttpAddressInfo) baseUrl := "http://" + addressInfo.ExternalIP + ":" + strconv.Itoa(addressInfo.Port) - config := cdssdk.Config{ + config := cdsapi.Config{ URL: baseUrl, } - pool := cdssdk.NewPool(&config) + pool := cdsapi.NewPool(&config) cli, err := pool.Acquire() defer pool.Release(cli) if err != nil { @@ -49,7 +50,7 @@ func (w *HttpHubWorker) Equals(worker exec.WorkerInfo) bool { } type HttpHubWorkerClient struct { - cli *cdssdk.Client + cli *cdsapi.Client } func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { diff --git a/common/pkgs/ioswitch2/ops2/chunked.go b/common/pkgs/ioswitch2/ops2/chunked.go index b4087b9..b8b4a11 100644 --- a/common/pkgs/ioswitch2/ops2/chunked.go +++ b/common/pkgs/ioswitch2/ops2/chunked.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -26,8 +25,8 @@ type ChunkedSplit struct { PaddingZeros bool `json:"paddingZeros"` } -func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) +func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Input) if err != nil { return err } @@ -39,7 +38,7 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { sem := semaphore.NewWeighted(int64(len(outputs))) for i := range outputs { - sem.Acquire(ctx, 1) + sem.Acquire(ctx.Context, 1) o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { sem.Release(1) @@ -47,7 +46,7 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { } exec.PutArrayVars(e, o.Outputs) - return sem.Acquire(ctx, int64(len(outputs))) + return sem.Acquire(ctx.Context, int64(len(outputs))) } func (o *ChunkedSplit) String() string { @@ -66,8 +65,8 @@ type ChunkedJoin struct { ChunkSize int `json:"chunkSize"` } -func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx, o.Inputs) +func (o *ChunkedJoin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx.Context, o.Inputs) if err != nil { return err } @@ -88,7 +87,7 @@ func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { }) e.PutVars(o.Output) - return fut.Wait(ctx) + return fut.Wait(ctx.Context) } func (o *ChunkedJoin) String() string { diff --git a/common/pkgs/ioswitch2/ops2/clone.go b/common/pkgs/ioswitch2/ops2/clone.go index 66d1194..e195057 100644 --- a/common/pkgs/ioswitch2/ops2/clone.go +++ b/common/pkgs/ioswitch2/ops2/clone.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -23,8 +22,8 @@ type CloneStream struct { Cloneds []*exec.StreamVar `json:"cloneds"` } -func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Raw) +func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Raw) if err != nil { return err } @@ -34,7 +33,7 @@ func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { - sem.Acquire(ctx, 1) + sem.Acquire(ctx.Context, 1) o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { sem.Release(1) @@ -42,7 +41,7 @@ func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { } exec.PutArrayVars(e, o.Cloneds) - return sem.Acquire(ctx, int64(len(o.Cloneds))) + return sem.Acquire(ctx.Context, int64(len(o.Cloneds))) } func (o *CloneStream) String() string { @@ -54,8 +53,8 @@ type CloneVar struct { Cloneds []exec.Var `json:"cloneds"` } -func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Raw) +func (o *CloneVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Raw) if err != nil { return err } diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 246151b..0cca461 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -31,13 +31,13 @@ type ECReconstructAny struct { OutputBlockIndexes []int `json:"outputBlockIndexes"` } -func (o *ECReconstructAny) Execute(ctx context.Context, e *exec.Executor) error { +func (o *ECReconstructAny) Execute(ctx *exec.ExecContext, e *exec.Executor) error { rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) if err != nil { return fmt.Errorf("new ec: %w", err) } - err = exec.BindArrayVars(e, ctx, o.Inputs) + err = exec.BindArrayVars(e, ctx.Context, o.Inputs) if err != nil { return err } @@ -56,7 +56,7 @@ func (o *ECReconstructAny) Execute(ctx context.Context, e *exec.Executor) error sem := semaphore.NewWeighted(int64(len(o.Outputs))) for i := range o.Outputs { - sem.Acquire(ctx, 1) + sem.Acquire(ctx.Context, 1) o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { sem.Release(1) @@ -64,7 +64,7 @@ func (o *ECReconstructAny) Execute(ctx context.Context, e *exec.Executor) error } exec.PutArrayVars(e, o.Outputs) - return sem.Acquire(ctx, int64(len(o.Outputs))) + return sem.Acquire(ctx.Context, int64(len(o.Outputs))) } type ECReconstruct struct { @@ -117,8 +117,8 @@ type ECMultiply struct { ChunkSize int `json:"chunkSize"` } -func (o *ECMultiply) Execute(ctx context.Context, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx, o.Inputs) +func (o *ECMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx.Context, o.Inputs) if err != nil { return err } @@ -180,7 +180,7 @@ func (o *ECMultiply) Execute(ctx context.Context, e *exec.Executor) error { }() exec.PutArrayVars(e, o.Outputs) - err = fut.Wait(ctx) + err = fut.Wait(ctx.Context) if err != nil { for _, wr := range outputWrs { wr.CloseWithError(err) diff --git a/common/pkgs/ioswitch2/ops2/file.go b/common/pkgs/ioswitch2/ops2/file.go index 33dba33..0c8d79c 100644 --- a/common/pkgs/ioswitch2/ops2/file.go +++ b/common/pkgs/ioswitch2/ops2/file.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" "os" @@ -23,8 +22,8 @@ type FileWrite struct { FilePath string `json:"filePath"` } -func (o *FileWrite) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) +func (o *FileWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Input) if err != nil { return err } @@ -59,7 +58,7 @@ type FileRead struct { FilePath string `json:"filePath"` } -func (o *FileRead) Execute(ctx context.Context, e *exec.Executor) error { +func (o *FileRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { file, err := os.Open(o.FilePath) if err != nil { return fmt.Errorf("opening file: %w", err) @@ -70,7 +69,7 @@ func (o *FileRead) Execute(ctx context.Context, e *exec.Executor) error { fut.SetVoid() }) e.PutVars(o.Output) - fut.Wait(ctx) + fut.Wait(ctx.Context) return nil } diff --git a/common/pkgs/ioswitch2/ops2/ipfs.go b/common/pkgs/ioswitch2/ops2/ipfs.go deleted file mode 100644 index f8e4e2a..0000000 --- a/common/pkgs/ioswitch2/ops2/ipfs.go +++ /dev/null @@ -1,170 +0,0 @@ -package ops2 - -import ( - "context" - "fmt" - "io" - - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/io2" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" -) - -func init() { - exec.UseOp[*IPFSRead]() - exec.UseOp[*IPFSWrite]() -} - -type IPFSRead struct { - Output *exec.StreamVar `json:"output"` - FileHash string `json:"fileHash"` - Option ipfs.ReadOption `json:"option"` -} - -func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { - logger. - WithField("FileHash", o.FileHash). - Debugf("ipfs read op") - defer logger.Debugf("ipfs read op finished") - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return fmt.Errorf("new ipfs client: %w", err) - } - defer stgglb.IPFSPool.Release(ipfsCli) - - file, err := ipfsCli.OpenRead(o.FileHash, o.Option) - if err != nil { - return fmt.Errorf("reading ipfs: %w", err) - } - defer file.Close() - - fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { - fut.SetVoid() - }) - e.PutVars(o.Output) - - return fut.Wait(ctx) -} - -func (o *IPFSRead) String() string { - return fmt.Sprintf("IPFSRead %v -> %v", o.FileHash, o.Output.ID) -} - -type IPFSWrite struct { - Input *exec.StreamVar `json:"input"` - FileHash *exec.StringVar `json:"fileHash"` -} - -func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error { - logger. - WithField("Input", o.Input.ID). - WithField("FileHashVar", o.FileHash.ID). - Debugf("ipfs write op") - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return fmt.Errorf("new ipfs client: %w", err) - } - defer stgglb.IPFSPool.Release(ipfsCli) - - err = e.BindVars(ctx, o.Input) - if err != nil { - return err - } - defer o.Input.Stream.Close() - - o.FileHash.Value, err = ipfsCli.CreateFile(o.Input.Stream) - if err != nil { - return fmt.Errorf("creating ipfs file: %w", err) - } - - e.PutVars(o.FileHash) - - return nil -} - -func (o *IPFSWrite) String() string { - return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) -} - -type IPFSReadNode struct { - dag.NodeBase - FileHash string - Option ipfs.ReadOption -} - -func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode { - node := &IPFSReadNode{ - FileHash: fileHash, - Option: option, - } - b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewStreamVar()) - return node -} - -func (t *IPFSReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.OutputStreams().Get(0), - Index: 0, - } -} - -func (t *IPFSReadNode) GenerateOp() (exec.Op, error) { - return &IPFSRead{ - Output: t.OutputStreams().Get(0).Var, - FileHash: t.FileHash, - Option: t.Option, - }, nil -} - -// func (t *IPFSReadType) String() string { -// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -// } - -type IPFSWriteNode struct { - dag.NodeBase - FileHashStoreKey string -} - -func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { - node := &IPFSWriteNode{ - FileHashStoreKey: fileHashStoreKey, - } - b.AddNode(node) - return node -} - -func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { - t.InputStreams().EnsureSize(1) - input.Connect(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) -} - -func (t *IPFSWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), - Index: 0, - } -} - -func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { - return t.OutputValues().Get(0) -} - -func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { - return &IPFSWrite{ - Input: t.InputStreams().Get(0).Var, - FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), - }, nil -} - -// func (t *IPFSWriteType) String() string { -// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -// } diff --git a/common/pkgs/ioswitch2/ops2/range.go b/common/pkgs/ioswitch2/ops2/range.go index 1475b4e..90e6f5c 100644 --- a/common/pkgs/ioswitch2/ops2/range.go +++ b/common/pkgs/ioswitch2/ops2/range.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -23,8 +22,8 @@ type Range struct { Length *int64 `json:"length"` } -func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) +func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Input) if err != nil { return err } @@ -54,7 +53,7 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { }) e.PutVars(o.Output) - return fut.Wait(ctx) + return fut.Wait(ctx.Context) } o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) { @@ -62,7 +61,7 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { }) e.PutVars(o.Output) - err = fut.Wait(ctx) + err = fut.Wait(ctx.Context) if err != nil { return err } diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go new file mode 100644 index 0000000..dfae8ad --- /dev/null +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -0,0 +1,188 @@ +package ops2 + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/pool" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" +) + +func init() { + exec.UseOp[*ShardRead]() + exec.UseOp[*ShardWrite]() +} + +type ShardRead struct { + Output *exec.StreamVar `json:"output"` + StorageID cdssdk.StorageID `json:"storageID"` + Open types.OpenOption `json:"option"` +} + +func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + logger. + WithField("Open", o.Open). + Debugf("reading from shard store") + defer logger.Debugf("reading from shard store finished") + + pool, err := exec.ValueByType[*pool.ShardStorePool](ctx) + if err != nil { + return fmt.Errorf("getting shard store pool: %w", err) + } + + store, err := pool.Get(o.StorageID) + if err != nil { + return fmt.Errorf("getting shard store %v: %w", o.StorageID, err) + } + + file, err := store.Open(o.Open) + if err != nil { + return fmt.Errorf("opening shard store file: %w", err) + } + + fut := future.NewSetVoid() + o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { + fut.SetVoid() + }) + e.PutVars(o.Output) + + return fut.Wait(ctx.Context) +} + +func (o *ShardRead) String() string { + return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output.ID) +} + +type ShardWrite struct { + Input *exec.StreamVar `json:"input"` + FileHash *exec.StringVar `json:"fileHash"` + StorageID cdssdk.StorageID `json:"storageID"` +} + +func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + logger. + WithField("Input", o.Input.ID). + WithField("FileHashVar", o.FileHash.ID). + Debugf("writting file to shard store") + defer logger.Debugf("write to shard store finished") + + pool, err := exec.ValueByType[*pool.ShardStorePool](ctx) + if err != nil { + return fmt.Errorf("getting shard store pool: %w", err) + } + + store, err := pool.Get(o.StorageID) + if err != nil { + return fmt.Errorf("getting shard store %v: %w", o.StorageID, err) + } + + err = e.BindVars(ctx.Context, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + writer := store.New() + defer writer.Abort() + + _, err = io.Copy(writer, o.Input.Stream) + if err != nil { + return fmt.Errorf("writing file to shard store: %w", err) + } + + fileInfo, err := writer.Finish() + if err != nil { + return fmt.Errorf("finishing writing file to shard store: %w", err) + } + + o.FileHash.Value = string(fileInfo.Hash) + + e.PutVars(o.FileHash) + return nil +} + +func (o *ShardWrite) String() string { + return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) +} + +type ShardReadNode struct { + dag.NodeBase + StorageID cdssdk.StorageID + Open types.OpenOption +} + +func (b *GraphNodeBuilder) NewIPFSRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { + node := &ShardReadNode{ + StorageID: stgID, + Open: open, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *ShardReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } +} + +func (t *ShardReadNode) GenerateOp() (exec.Op, error) { + return &ShardRead{ + Output: t.OutputStreams().Get(0).Var, + StorageID: t.StorageID, + Open: t.Open, + }, nil +} + +// func (t *IPFSReadType) String() string { +// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +// } + +type IPFSWriteNode struct { + dag.NodeBase + FileHashStoreKey string +} + +func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { + node := &IPFSWriteNode{ + FileHashStoreKey: fileHashStoreKey, + } + b.AddNode(node) + return node +} + +func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) +} + +func (t *IPFSWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } +} + +func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { + return &ShardWrite{ + Input: t.InputStreams().Get(0).Var, + FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), + }, nil +} + +// func (t *IPFSWriteType) String() string { +// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index dd5130d..746dc2a 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -7,12 +7,12 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" ) type DefaultParser struct { @@ -235,36 +235,25 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2 switch f := f.(type) { case *ioswitch2.FromNode: - t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{ - Offset: 0, - Length: -1, - }) + t := ctx.DAG.NewIPFSRead(f.Storage.StorageID, types.NewOpen(f.FileHash)) if f.DataIndex == -1 { - t.Option.Offset = repRange.Offset - if repRange.Length != nil { - t.Option.Length = *repRange.Length - } + t.Open.WithNullableLength(repRange.Offset, repRange.Length) } else { - t.Option.Offset = blkRange.Offset - if blkRange.Length != nil { - t.Option.Length = *blkRange.Length - } + t.Open.WithNullableLength(blkRange.Offset, blkRange.Length) } - if f.Node != nil { - switch typeInfo := f.Node.Address.(type) { - case *cdssdk.HttpAddressInfo: - t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Node: *f.Node}) - t.Env().Pinned = true + switch typeInfo := f.Node.Address.(type) { + case *cdssdk.HttpAddressInfo: + t.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Node: f.Node}) + t.Env().Pinned = true - case *cdssdk.GRPCAddressInfo: - t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: *f.Node}) - t.Env().Pinned = true + case *cdssdk.GRPCAddressInfo: + t.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: f.Node}) + t.Env().Pinned = true - default: - return nil, fmt.Errorf("unsupported node address type %T", typeInfo) - } + default: + return nil, fmt.Errorf("unsupported node address type %T", typeInfo) } return t, nil diff --git a/common/pkgs/ioswitchlrc/fromto.go b/common/pkgs/ioswitchlrc/fromto.go index ef3ad0e..2272d9e 100644 --- a/common/pkgs/ioswitchlrc/fromto.go +++ b/common/pkgs/ioswitchlrc/fromto.go @@ -3,6 +3,7 @@ package ioswitchlrc import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" ) type From interface { @@ -37,12 +38,13 @@ func (f *FromDriver) GetDataIndex() int { } type FromNode struct { - FileHash string - Node *cdssdk.Node + FileHash types.FileHash + Node cdssdk.Node + Storage cdssdk.Storage DataIndex int } -func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode { +func NewFromNode(fileHash types.FileHash, node cdssdk.Node, storage cdssdk.Storage, dataIndex int) *FromNode { return &FromNode{ FileHash: fileHash, Node: node, diff --git a/common/pkgs/ioswitchlrc/ops2/chunked.go b/common/pkgs/ioswitchlrc/ops2/chunked.go index 9707c20..ab849b2 100644 --- a/common/pkgs/ioswitchlrc/ops2/chunked.go +++ b/common/pkgs/ioswitchlrc/ops2/chunked.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -26,8 +25,8 @@ type ChunkedSplit struct { PaddingZeros bool `json:"paddingZeros"` } -func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) +func (o *ChunkedSplit) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Input) if err != nil { return err } @@ -39,7 +38,7 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { sem := semaphore.NewWeighted(int64(len(outputs))) for i := range outputs { - sem.Acquire(ctx, 1) + sem.Acquire(ctx.Context, 1) o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { sem.Release(1) @@ -47,7 +46,7 @@ func (o *ChunkedSplit) Execute(ctx context.Context, e *exec.Executor) error { } exec.PutArrayVars(e, o.Outputs) - return sem.Acquire(ctx, int64(len(outputs))) + return sem.Acquire(ctx.Context, int64(len(outputs))) } func (o *ChunkedSplit) String() string { @@ -66,8 +65,8 @@ type ChunkedJoin struct { ChunkSize int `json:"chunkSize"` } -func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx, o.Inputs) +func (o *ChunkedJoin) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx.Context, o.Inputs) if err != nil { return err } @@ -88,7 +87,7 @@ func (o *ChunkedJoin) Execute(ctx context.Context, e *exec.Executor) error { }) e.PutVars(o.Output) - return fut.Wait(ctx) + return fut.Wait(ctx.Context) } func (o *ChunkedJoin) String() string { diff --git a/common/pkgs/ioswitchlrc/ops2/clone.go b/common/pkgs/ioswitchlrc/ops2/clone.go index 66d1194..e195057 100644 --- a/common/pkgs/ioswitchlrc/ops2/clone.go +++ b/common/pkgs/ioswitchlrc/ops2/clone.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -23,8 +22,8 @@ type CloneStream struct { Cloneds []*exec.StreamVar `json:"cloneds"` } -func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Raw) +func (o *CloneStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Raw) if err != nil { return err } @@ -34,7 +33,7 @@ func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { sem := semaphore.NewWeighted(int64(len(o.Cloneds))) for i, s := range cloned { - sem.Acquire(ctx, 1) + sem.Acquire(ctx.Context, 1) o.Cloneds[i].Stream = io2.AfterReadClosedOnce(s, func(closer io.ReadCloser) { sem.Release(1) @@ -42,7 +41,7 @@ func (o *CloneStream) Execute(ctx context.Context, e *exec.Executor) error { } exec.PutArrayVars(e, o.Cloneds) - return sem.Acquire(ctx, int64(len(o.Cloneds))) + return sem.Acquire(ctx.Context, int64(len(o.Cloneds))) } func (o *CloneStream) String() string { @@ -54,8 +53,8 @@ type CloneVar struct { Cloneds []exec.Var `json:"cloneds"` } -func (o *CloneVar) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Raw) +func (o *CloneVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Raw) if err != nil { return err } diff --git a/common/pkgs/ioswitchlrc/ops2/ec.go b/common/pkgs/ioswitchlrc/ops2/ec.go index 358a0e3..6ce0d7e 100644 --- a/common/pkgs/ioswitchlrc/ops2/ec.go +++ b/common/pkgs/ioswitchlrc/ops2/ec.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -28,8 +27,8 @@ type GalMultiply struct { ChunkSize int `json:"chunkSize"` } -func (o *GalMultiply) Execute(ctx context.Context, e *exec.Executor) error { - err := exec.BindArrayVars(e, ctx, o.Inputs) +func (o *GalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := exec.BindArrayVars(e, ctx.Context, o.Inputs) if err != nil { return err } @@ -91,7 +90,7 @@ func (o *GalMultiply) Execute(ctx context.Context, e *exec.Executor) error { }() exec.PutArrayVars(e, o.Outputs) - err = fut.Wait(ctx) + err = fut.Wait(ctx.Context) if err != nil { for _, wr := range outputWrs { wr.CloseWithError(err) diff --git a/common/pkgs/ioswitchlrc/ops2/ipfs.go b/common/pkgs/ioswitchlrc/ops2/ipfs.go deleted file mode 100644 index f8e4e2a..0000000 --- a/common/pkgs/ioswitchlrc/ops2/ipfs.go +++ /dev/null @@ -1,170 +0,0 @@ -package ops2 - -import ( - "context" - "fmt" - "io" - - "gitlink.org.cn/cloudream/common/pkgs/future" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" - "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/utils/io2" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" -) - -func init() { - exec.UseOp[*IPFSRead]() - exec.UseOp[*IPFSWrite]() -} - -type IPFSRead struct { - Output *exec.StreamVar `json:"output"` - FileHash string `json:"fileHash"` - Option ipfs.ReadOption `json:"option"` -} - -func (o *IPFSRead) Execute(ctx context.Context, e *exec.Executor) error { - logger. - WithField("FileHash", o.FileHash). - Debugf("ipfs read op") - defer logger.Debugf("ipfs read op finished") - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return fmt.Errorf("new ipfs client: %w", err) - } - defer stgglb.IPFSPool.Release(ipfsCli) - - file, err := ipfsCli.OpenRead(o.FileHash, o.Option) - if err != nil { - return fmt.Errorf("reading ipfs: %w", err) - } - defer file.Close() - - fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { - fut.SetVoid() - }) - e.PutVars(o.Output) - - return fut.Wait(ctx) -} - -func (o *IPFSRead) String() string { - return fmt.Sprintf("IPFSRead %v -> %v", o.FileHash, o.Output.ID) -} - -type IPFSWrite struct { - Input *exec.StreamVar `json:"input"` - FileHash *exec.StringVar `json:"fileHash"` -} - -func (o *IPFSWrite) Execute(ctx context.Context, e *exec.Executor) error { - logger. - WithField("Input", o.Input.ID). - WithField("FileHashVar", o.FileHash.ID). - Debugf("ipfs write op") - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return fmt.Errorf("new ipfs client: %w", err) - } - defer stgglb.IPFSPool.Release(ipfsCli) - - err = e.BindVars(ctx, o.Input) - if err != nil { - return err - } - defer o.Input.Stream.Close() - - o.FileHash.Value, err = ipfsCli.CreateFile(o.Input.Stream) - if err != nil { - return fmt.Errorf("creating ipfs file: %w", err) - } - - e.PutVars(o.FileHash) - - return nil -} - -func (o *IPFSWrite) String() string { - return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) -} - -type IPFSReadNode struct { - dag.NodeBase - FileHash string - Option ipfs.ReadOption -} - -func (b *GraphNodeBuilder) NewIPFSRead(fileHash string, option ipfs.ReadOption) *IPFSReadNode { - node := &IPFSReadNode{ - FileHash: fileHash, - Option: option, - } - b.AddNode(node) - node.OutputStreams().SetupNew(node, b.NewStreamVar()) - return node -} - -func (t *IPFSReadNode) Output() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.OutputStreams().Get(0), - Index: 0, - } -} - -func (t *IPFSReadNode) GenerateOp() (exec.Op, error) { - return &IPFSRead{ - Output: t.OutputStreams().Get(0).Var, - FileHash: t.FileHash, - Option: t.Option, - }, nil -} - -// func (t *IPFSReadType) String() string { -// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) -// } - -type IPFSWriteNode struct { - dag.NodeBase - FileHashStoreKey string -} - -func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { - node := &IPFSWriteNode{ - FileHashStoreKey: fileHashStoreKey, - } - b.AddNode(node) - return node -} - -func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { - t.InputStreams().EnsureSize(1) - input.Connect(t, 0) - t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) -} - -func (t *IPFSWriteNode) Input() dag.StreamSlot { - return dag.StreamSlot{ - Var: t.InputStreams().Get(0), - Index: 0, - } -} - -func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { - return t.OutputValues().Get(0) -} - -func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { - return &IPFSWrite{ - Input: t.InputStreams().Get(0).Var, - FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), - }, nil -} - -// func (t *IPFSWriteType) String() string { -// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) -// } diff --git a/common/pkgs/ioswitchlrc/ops2/range.go b/common/pkgs/ioswitchlrc/ops2/range.go index 1475b4e..90e6f5c 100644 --- a/common/pkgs/ioswitchlrc/ops2/range.go +++ b/common/pkgs/ioswitchlrc/ops2/range.go @@ -1,7 +1,6 @@ package ops2 import ( - "context" "fmt" "io" @@ -23,8 +22,8 @@ type Range struct { Length *int64 `json:"length"` } -func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { - err := e.BindVars(ctx, o.Input) +func (o *Range) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + err := e.BindVars(ctx.Context, o.Input) if err != nil { return err } @@ -54,7 +53,7 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { }) e.PutVars(o.Output) - return fut.Wait(ctx) + return fut.Wait(ctx.Context) } o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) { @@ -62,7 +61,7 @@ func (o *Range) Execute(ctx context.Context, e *exec.Executor) error { }) e.PutVars(o.Output) - err = fut.Wait(ctx) + err = fut.Wait(ctx.Context) if err != nil { return err } diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go new file mode 100644 index 0000000..dfae8ad --- /dev/null +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -0,0 +1,188 @@ +package ops2 + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" + "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" + "gitlink.org.cn/cloudream/common/pkgs/logger" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/pool" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" +) + +func init() { + exec.UseOp[*ShardRead]() + exec.UseOp[*ShardWrite]() +} + +type ShardRead struct { + Output *exec.StreamVar `json:"output"` + StorageID cdssdk.StorageID `json:"storageID"` + Open types.OpenOption `json:"option"` +} + +func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + logger. + WithField("Open", o.Open). + Debugf("reading from shard store") + defer logger.Debugf("reading from shard store finished") + + pool, err := exec.ValueByType[*pool.ShardStorePool](ctx) + if err != nil { + return fmt.Errorf("getting shard store pool: %w", err) + } + + store, err := pool.Get(o.StorageID) + if err != nil { + return fmt.Errorf("getting shard store %v: %w", o.StorageID, err) + } + + file, err := store.Open(o.Open) + if err != nil { + return fmt.Errorf("opening shard store file: %w", err) + } + + fut := future.NewSetVoid() + o.Output.Stream = io2.AfterReadClosedOnce(file, func(closer io.ReadCloser) { + fut.SetVoid() + }) + e.PutVars(o.Output) + + return fut.Wait(ctx.Context) +} + +func (o *ShardRead) String() string { + return fmt.Sprintf("ShardRead %v -> %v", o.Open, o.Output.ID) +} + +type ShardWrite struct { + Input *exec.StreamVar `json:"input"` + FileHash *exec.StringVar `json:"fileHash"` + StorageID cdssdk.StorageID `json:"storageID"` +} + +func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { + logger. + WithField("Input", o.Input.ID). + WithField("FileHashVar", o.FileHash.ID). + Debugf("writting file to shard store") + defer logger.Debugf("write to shard store finished") + + pool, err := exec.ValueByType[*pool.ShardStorePool](ctx) + if err != nil { + return fmt.Errorf("getting shard store pool: %w", err) + } + + store, err := pool.Get(o.StorageID) + if err != nil { + return fmt.Errorf("getting shard store %v: %w", o.StorageID, err) + } + + err = e.BindVars(ctx.Context, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + writer := store.New() + defer writer.Abort() + + _, err = io.Copy(writer, o.Input.Stream) + if err != nil { + return fmt.Errorf("writing file to shard store: %w", err) + } + + fileInfo, err := writer.Finish() + if err != nil { + return fmt.Errorf("finishing writing file to shard store: %w", err) + } + + o.FileHash.Value = string(fileInfo.Hash) + + e.PutVars(o.FileHash) + return nil +} + +func (o *ShardWrite) String() string { + return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) +} + +type ShardReadNode struct { + dag.NodeBase + StorageID cdssdk.StorageID + Open types.OpenOption +} + +func (b *GraphNodeBuilder) NewIPFSRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { + node := &ShardReadNode{ + StorageID: stgID, + Open: open, + } + b.AddNode(node) + node.OutputStreams().SetupNew(node, b.NewStreamVar()) + return node +} + +func (t *ShardReadNode) Output() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.OutputStreams().Get(0), + Index: 0, + } +} + +func (t *ShardReadNode) GenerateOp() (exec.Op, error) { + return &ShardRead{ + Output: t.OutputStreams().Get(0).Var, + StorageID: t.StorageID, + Open: t.Open, + }, nil +} + +// func (t *IPFSReadType) String() string { +// return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) +// } + +type IPFSWriteNode struct { + dag.NodeBase + FileHashStoreKey string +} + +func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { + node := &IPFSWriteNode{ + FileHashStoreKey: fileHashStoreKey, + } + b.AddNode(node) + return node +} + +func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { + t.InputStreams().EnsureSize(1) + input.Connect(t, 0) + t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) +} + +func (t *IPFSWriteNode) Input() dag.StreamSlot { + return dag.StreamSlot{ + Var: t.InputStreams().Get(0), + Index: 0, + } +} + +func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { + return t.OutputValues().Get(0) +} + +func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { + return &ShardWrite{ + Input: t.InputStreams().Get(0).Var, + FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), + }, nil +} + +// func (t *IPFSWriteType) String() string { +// return fmt.Sprintf("IPFSWrite[%s,%v+%v]%v%v", t.FileHashStoreKey, t.Range.Offset, t.Range.Length, formatStreamIO(node), formatValueIO(node)) +// } diff --git a/common/pkgs/ioswitchlrc/parser/passes.go b/common/pkgs/ioswitchlrc/parser/passes.go index 4a5ba37..94dfefa 100644 --- a/common/pkgs/ioswitchlrc/parser/passes.go +++ b/common/pkgs/ioswitchlrc/parser/passes.go @@ -6,10 +6,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitchlrc/ops2" + "gitlink.org.cn/cloudream/storage/common/pkgs/shardstore/types" ) // 计算输入流的打开范围。会把流的范围按条带大小取整 @@ -63,27 +63,16 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err switch f := f.(type) { case *ioswitchlrc.FromNode: - t := ctx.DAG.NewIPFSRead(f.FileHash, ipfs.ReadOption{ - Offset: 0, - Length: -1, - }) + t := ctx.DAG.NewIPFSRead(f.Storage.StorageID, types.NewOpen(f.FileHash)) if f.DataIndex == -1 { - t.Option.Offset = repRange.Offset - if repRange.Length != nil { - t.Option.Length = *repRange.Length - } + t.Open.WithNullableLength(repRange.Offset, repRange.Length) } else { - t.Option.Offset = blkRange.Offset - if blkRange.Length != nil { - t.Option.Length = *blkRange.Length - } + t.Open.WithNullableLength(blkRange.Offset, blkRange.Length) } - if f.Node != nil { - t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: *f.Node}) - t.Env().Pinned = true - } + t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: f.Node}) + t.Env().Pinned = true return t, nil