diff --git a/agent/internal/grpc/io.go b/agent/internal/grpc/io.go index b7977de..4ae7be5 100644 --- a/agent/internal/grpc/io.go +++ b/agent/internal/grpc/io.go @@ -127,10 +127,17 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStr return fmt.Errorf("plan not found") } + signal, err := serder.JSONToObjectEx[*ioswitch.SignalVar]([]byte(req.Signal)) + if err != nil { + return fmt.Errorf("deserializing var: %w", err) + } + + sw.PutVars(signal) + strVar := &ioswitch.StreamVar{ ID: ioswitch.VarID(req.VarID), } - err := sw.BindVars(server.Context(), strVar) + err = sw.BindVars(server.Context(), strVar) if err != nil { return fmt.Errorf("binding vars: %w", err) } @@ -214,6 +221,13 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge return nil, fmt.Errorf("deserializing var: %w", err) } + signal, err := serder.JSONToObjectEx[*ioswitch.SignalVar]([]byte(req.Signal)) + if err != nil { + return nil, fmt.Errorf("deserializing var: %w", err) + } + + sw.PutVars(signal) + err = sw.BindVars(ctx, v) if err != nil { return nil, fmt.Errorf("binding vars: %w", err) diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index 9c634d0..b9c1ca1 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -238,12 +238,19 @@ func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { } func uploadToNode(file io.Reader, node cdssdk.Node) (string, error) { - plan := plans.NewPlanBuilder() - str, v := plan.AtExecutor().WillWrite() - v.To(node).IPFSWrite().ToExecutor().Store("fileHash") + ft := plans.NewFromTo() + fromExec, hd := plans.NewFromExecutor(-1) + ft.AddFrom(fromExec).AddTo(plans.NewToNode(node, -1, "fileHash")) - exec := plan.Execute() - exec.BeginWrite(io.NopCloser(file), str) + parser := plans.NewParser(cdssdk.DefaultECRedundancy) + plans := plans.NewPlanBuilder() + err := parser.Parse(ft, plans) + if err != nil { + return "", fmt.Errorf("parsing plan: %w", err) + } + + exec := plans.Execute() + exec.BeginWrite(io.NopCloser(file), hd) ret, err := exec.Wait(context.TODO()) if err != nil { return "", err diff --git a/common/pkgs/downloader/io.go b/common/pkgs/downloader/io.go deleted file mode 100644 index 464c429..0000000 --- a/common/pkgs/downloader/io.go +++ /dev/null @@ -1,148 +0,0 @@ -package downloader - -import ( - "context" - "fmt" - "io" - - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - "gitlink.org.cn/cloudream/common/pkgs/logger" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/io2" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" - "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" -) - -type IPFSReader struct { - node cdssdk.Node - fileHash string - stream io.ReadCloser - offset int64 -} - -func NewIPFSReader(node cdssdk.Node, fileHash string) *IPFSReader { - return &IPFSReader{ - node: node, - fileHash: fileHash, - } -} - -func NewIPFSReaderWithRange(node cdssdk.Node, fileHash string, rng ipfs.ReadOption) io.ReadCloser { - str := &IPFSReader{ - node: node, - fileHash: fileHash, - } - str.Seek(rng.Offset, io.SeekStart) - if rng.Length > 0 { - return io2.Length(str, rng.Length) - } - - return str -} - -func (r *IPFSReader) Seek(offset int64, whence int) (int64, error) { - if whence == io.SeekEnd { - return 0, fmt.Errorf("seek end not supported") - } - - if whence == io.SeekCurrent { - return 0, fmt.Errorf("seek current not supported") - } - - if r.stream == nil { - r.offset = offset - return r.offset, nil - } - - // 如果文件流已经打开,而seek的位置和当前位置不同,则需要重新打开文件流 - if offset != r.offset { - var err error - r.stream.Close() - r.offset = offset - r.stream, err = r.openStream() - if err != nil { - return 0, fmt.Errorf("reopen stream: %w", err) - } - } - - return r.offset, nil -} - -func (r *IPFSReader) Read(buf []byte) (int, error) { - if r.stream == nil { - var err error - r.stream, err = r.openStream() - if err != nil { - return 0, err - } - } - - n, err := r.stream.Read(buf) - r.offset += int64(n) - return n, err -} - -func (r *IPFSReader) Close() error { - if r.stream != nil { - return r.stream.Close() - } - - return nil -} - -func (r *IPFSReader) openStream() (io.ReadCloser, error) { - if stgglb.IPFSPool != nil { - logger.Debug("try to use local IPFS to download file") - - reader, err := r.fromLocalIPFS() - if err == nil { - return reader, nil - } - - logger.Warnf("download from local IPFS failed, so try to download from node %v, err: %s", r.node.Name, err.Error()) - } - - return r.fromNode() -} - -func (r *IPFSReader) fromNode() (io.ReadCloser, error) { - planBld := plans.NewPlanBuilder() - toExe, toStr := plans.NewToExecutor(-1) - ft := plans.FromTo{ - Froms: []plans.From{ - plans.NewFromIPFS(r.node, r.fileHash, -1), - }, - Tos: []plans.To{ - toExe, - }, - } - par := plans.DefaultParser{} - par.Parse(ft, planBld) - - exec := planBld.Execute() - go func() { - exec.Wait(context.Background()) - }() - - return exec.BeginRead(toStr) -} - -func (r *IPFSReader) fromLocalIPFS() (io.ReadCloser, error) { - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return nil, fmt.Errorf("new ipfs client: %w", err) - } - - reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{ - Offset: r.offset, - Length: -1, - }) - if err != nil { - return nil, fmt.Errorf("read ipfs file failed, err: %w", err) - } - - reader = io2.AfterReadClosed(reader, func(io.ReadCloser) { - ipfsCli.Close() - }) - return reader, nil -} diff --git a/common/pkgs/downloader/iterator.go b/common/pkgs/downloader/iterator.go index 6fa7078..5336944 100644 --- a/common/pkgs/downloader/iterator.go +++ b/common/pkgs/downloader/iterator.go @@ -11,7 +11,6 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/bitmap" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -172,45 +171,20 @@ func (iter *DownloadObjectIterator) downloadNoneOrRepObject(obj downloadReqeust2 return nil, err } - var strHandle *plans.ExecutorReadStream - ft := plans.FromTo{ - Object: *obj.Detail, - } - bsc, blocks := iter.getMinReadingBlockSolution(allNodes, 1) osc, node := iter.getMinReadingObjectSolution(allNodes, 1) if bsc < osc { logger.Debugf("downloading object from node %v(%v)", blocks[0].Node.Name, blocks[0].Node.NodeID) + return iter.downloadFromNode(&blocks[0].Node, obj) + } - toExec, handle := plans.NewToExecutor(-1) - ft.AddFrom(plans.NewFromNode(&blocks[0].Node, -1)).AddTo(toExec) - strHandle = handle - - // TODO2 处理Offset和Length - } else if osc == math.MaxFloat64 { + if osc == math.MaxFloat64 { // bsc >= osc,如果osc是MaxFloat64,那么bsc也一定是,也就意味着没有足够块来恢复文件 return nil, fmt.Errorf("no node has this object") - } else { - logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID) - - toExec, handle := plans.NewToExecutor(-1) - ft.AddFrom(plans.NewFromNode(node, -1)).AddTo(toExec) - strHandle = handle - // TODO2 处理Offset和Length - } - - parser := plans.DefaultParser{ - EC: cdssdk.DefaultECRedundancy, - } - plans := plans.NewPlanBuilder() - if err := parser.Parse(ft, plans); err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) } - exec := plans.Execute() - go exec.Wait(context.TODO()) - - return exec.BeginRead(strHandle) + logger.Debugf("downloading object from node %v(%v)", node.Name, node.NodeID) + return iter.downloadFromNode(node, obj) } func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed *cdssdk.ECRedundancy) (io.ReadCloser, error) { @@ -280,10 +254,7 @@ func (iter *DownloadObjectIterator) downloadECObject(req downloadReqeust2, ecRed } logger.Debugf("downloading ec object from node %v(%v)", node.Name, node.NodeID) - return NewIPFSReaderWithRange(*node, req.Detail.Object.FileHash, ipfs.ReadOption{ - Offset: req.Raw.Offset, - Length: req.Raw.Length, - }), nil + return iter.downloadFromNode(node, req) } func (iter *DownloadObjectIterator) sortDownloadNodes(req downloadReqeust2) ([]*DownloadNodeInfo, error) { @@ -389,3 +360,30 @@ func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 { return consts.NodeDistanceOther } + +func (iter *DownloadObjectIterator) downloadFromNode(node *cdssdk.Node, req downloadReqeust2) (io.ReadCloser, error) { + var strHandle *plans.ExecutorReadStream + ft := plans.NewFromTo() + + toExec, handle := plans.NewToExecutor(-1) + toExec.Range = plans.Range{ + Offset: req.Raw.Offset, + } + if req.Raw.Length != -1 { + len := req.Raw.Length + toExec.Range.Length = &len + } + ft.AddFrom(plans.NewFromNode(req.Detail.Object.FileHash, node, -1)).AddTo(toExec) + strHandle = handle + + parser := plans.NewParser(cdssdk.DefaultECRedundancy) + plans := plans.NewPlanBuilder() + if err := parser.Parse(ft, plans); err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + + exec := plans.Execute() + go exec.Wait(context.TODO()) + + return exec.BeginRead(strHandle) +} diff --git a/common/pkgs/downloader/strip_iterator.go b/common/pkgs/downloader/strip_iterator.go index da4654b..416cab9 100644 --- a/common/pkgs/downloader/strip_iterator.go +++ b/common/pkgs/downloader/strip_iterator.go @@ -1,15 +1,15 @@ package downloader import ( + "context" "io" "sync" "gitlink.org.cn/cloudream/common/pkgs/iterator" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/common/utils/sync2" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans" ) type downloadBlock struct { @@ -108,15 +108,33 @@ func (s *StripIterator) Close() { } func (s *StripIterator) downloading() { - rs, err := ec.NewRs(s.red.K, s.red.N) + ft := plans.NewFromTo() + for _, b := range s.blocks { + ft.AddFrom(plans.NewFromNode(b.Block.FileHash, &b.Node, b.Block.Index)) + } + + toExec, hd := plans.NewToExecutorWithRange(-1, plans.Range{ + Offset: s.curStripIndex * int64(s.red.ChunkSize*s.red.K), + }) + ft.AddTo(toExec) + + parser := plans.NewParser(*s.red) + plans := plans.NewPlanBuilder() + err := parser.Parse(ft, plans) if err != nil { s.sendToDataChan(dataChanEntry{Error: err}) return } + exec := plans.Execute() - var blockStrs []*IPFSReader - for _, b := range s.blocks { - blockStrs = append(blockStrs, NewIPFSReader(b.Node, b.Block.FileHash)) + ctx, cancel := context.WithCancel(context.Background()) + go exec.Wait(ctx) + defer cancel() + + str, err := exec.BeginRead(hd) + if err != nil { + s.sendToDataChan(dataChanEntry{Error: err}) + return } curStripIndex := s.curStripIndex @@ -148,40 +166,18 @@ loop: } } - for _, str := range blockStrs { - _, err := str.Seek(curStripIndex*int64(s.red.ChunkSize), io.SeekStart) - if err != nil { - s.sendToDataChan(dataChanEntry{Error: err}) - break loop - } - } - dataBuf := make([]byte, int64(s.red.K*s.red.ChunkSize)) - blockArrs := make([][]byte, s.red.N) - for i := 0; i < s.red.K; i++ { - // 放入的slice长度为0,但容量为ChunkSize,EC库发现长度为0的块后才会认为是待恢复块 - blockArrs[i] = dataBuf[i*s.red.ChunkSize : i*s.red.ChunkSize] - } - for _, b := range s.blocks { - // 用于恢复的块则要将其长度变回ChunkSize,用于后续读取块数据 - if b.Block.Index < s.red.K { - // 此处扩容不会导致slice指向一个新内存 - blockArrs[b.Block.Index] = blockArrs[b.Block.Index][0:s.red.ChunkSize] - } else { - blockArrs[b.Block.Index] = make([]byte, s.red.ChunkSize) - } - } - - err := sync2.ParallelDo(s.blocks, func(b downloadBlock, idx int) error { - _, err := io.ReadFull(blockStrs[idx], blockArrs[b.Block.Index]) - return err - }) - if err != nil { - s.sendToDataChan(dataChanEntry{Error: err}) + n, err := io.ReadFull(str, dataBuf) + if err == io.ErrUnexpectedEOF { + s.cache.Add(stripKey, ObjectECStrip{ + Data: dataBuf, + ObjectFileHash: s.object.FileHash, + }) + + s.sendToDataChan(dataChanEntry{Data: dataBuf[:n], Position: stripBytesPos}) + s.sendToDataChan(dataChanEntry{Error: io.EOF}) break loop } - - err = rs.ReconstructData(blockArrs) if err != nil { s.sendToDataChan(dataChanEntry{Error: err}) break loop @@ -199,10 +195,6 @@ loop: curStripIndex++ } - for _, str := range blockStrs { - str.Close() - } - close(s.dataChan) } diff --git a/common/pkgs/ec/multiply.go b/common/pkgs/ec/multiply.go index eb624d3..50f6c2d 100644 --- a/common/pkgs/ec/multiply.go +++ b/common/pkgs/ec/multiply.go @@ -3,5 +3,5 @@ package ec import "github.com/klauspost/reedsolomon" func GaloisMultiplier() *reedsolomon.MultipilerBuilder { - return &reedsolomon.MultipilerBuilder{} + return reedsolomon.DefaultMulOpt() } diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index 1805b93..08bec19 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -2,7 +2,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.34.2 // protoc v4.22.3 // source: pkgs/grpc/agent/agent.proto @@ -156,7 +156,7 @@ func (*ExecuteIOPlanResp) Descriptor() ([]byte, []int) { return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{1} } -// 文件数据。注意:只在Type为Data的时候,Data字段才能有数据 +// 文件数据。注意:只在Type为Data或EOF的时候,Data字段才能有数据 type FileDataPacket struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -329,6 +329,7 @@ type GetStreamReq struct { PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` VarID int32 `protobuf:"varint,2,opt,name=VarID,proto3" json:"VarID,omitempty"` + Signal string `protobuf:"bytes,3,opt,name=Signal,proto3" json:"Signal,omitempty"` } func (x *GetStreamReq) Reset() { @@ -377,6 +378,13 @@ func (x *GetStreamReq) GetVarID() int32 { return 0 } +func (x *GetStreamReq) GetSignal() string { + if x != nil { + return x.Signal + } + return "" +} + type SendVarReq struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -477,6 +485,7 @@ type GetVarReq struct { PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` Var string `protobuf:"bytes,2,opt,name=Var,proto3" json:"Var,omitempty"` + Signal string `protobuf:"bytes,3,opt,name=Signal,proto3" json:"Signal,omitempty"` } func (x *GetVarReq) Reset() { @@ -525,12 +534,19 @@ func (x *GetVarReq) GetVar() string { return "" } +func (x *GetVarReq) GetSignal() string { + if x != nil { + return x.Signal + } + return "" +} + type GetVarResp struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Var string `protobuf:"bytes,1,opt,name=Var,proto3" json:"Var,omitempty"` + Var string `protobuf:"bytes,1,opt,name=Var,proto3" json:"Var,omitempty"` // 此处不使用VarID的原因是,Switch的BindVars函数还需要知道Var的类型 } func (x *GetVarResp) Reset() { @@ -670,19 +686,22 @@ var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, - 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x3c, + 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x54, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x22, 0x36, 0x0a, 0x0a, - 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, - 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, - 0x49, 0x44, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x56, 0x61, 0x72, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, - 0x65, 0x73, 0x70, 0x22, 0x35, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, - 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, 0x61, 0x72, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, + 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x56, 0x61, 0x72, 0x49, 0x44, 0x12, 0x16, 0x0a, 0x06, + 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, + 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x36, 0x0a, 0x0a, 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, + 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, 0x61, 0x72, 0x22, 0x0d, 0x0a, 0x0b, + 0x53, 0x65, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x22, 0x4d, 0x0a, 0x09, 0x47, + 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, + 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, + 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, + 0x61, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x22, 0x1e, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x56, 0x61, 0x72, 0x52, 0x65, 0x73, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x56, 0x61, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x56, 0x61, 0x72, 0x22, 0x09, 0x0a, 0x07, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x22, 0x0a, 0x0a, 0x08, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, @@ -725,7 +744,7 @@ func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 12) -var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ +var file_pkgs_grpc_agent_agent_proto_goTypes = []any{ (StreamDataPacketType)(0), // 0: StreamDataPacketType (*ExecuteIOPlanReq)(nil), // 1: ExecuteIOPlanReq (*ExecuteIOPlanResp)(nil), // 2: ExecuteIOPlanResp @@ -768,7 +787,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_pkgs_grpc_agent_agent_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*ExecuteIOPlanReq); i { case 0: return &v.state @@ -780,7 +799,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*ExecuteIOPlanResp); i { case 0: return &v.state @@ -792,7 +811,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[2].Exporter = func(v any, i int) any { switch v := v.(*FileDataPacket); i { case 0: return &v.state @@ -804,7 +823,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*StreamDataPacket); i { case 0: return &v.state @@ -816,7 +835,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[4].Exporter = func(v any, i int) any { switch v := v.(*SendStreamResp); i { case 0: return &v.state @@ -828,7 +847,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[5].Exporter = func(v any, i int) any { switch v := v.(*GetStreamReq); i { case 0: return &v.state @@ -840,7 +859,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[6].Exporter = func(v any, i int) any { switch v := v.(*SendVarReq); i { case 0: return &v.state @@ -852,7 +871,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[7].Exporter = func(v any, i int) any { switch v := v.(*SendVarResp); i { case 0: return &v.state @@ -864,7 +883,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[8].Exporter = func(v any, i int) any { switch v := v.(*GetVarReq); i { case 0: return &v.state @@ -876,7 +895,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[9].Exporter = func(v any, i int) any { switch v := v.(*GetVarResp); i { case 0: return &v.state @@ -888,7 +907,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[10].Exporter = func(v any, i int) any { switch v := v.(*PingReq); i { case 0: return &v.state @@ -900,7 +919,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } - file_pkgs_grpc_agent_agent_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + file_pkgs_grpc_agent_agent_proto_msgTypes[11].Exporter = func(v any, i int) any { switch v := v.(*PingResp); i { case 0: return &v.state diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index 1b22e48..c4c7627 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -37,6 +37,7 @@ message SendStreamResp {} message GetStreamReq { string PlanID = 1; int32 VarID = 2; + string Signal = 3; } message SendVarReq { @@ -48,6 +49,7 @@ message SendVarResp {} message GetVarReq { string PlanID = 1; string Var = 2; + string Signal = 3; } message GetVarResp { string Var = 1; // 此处不使用VarID的原因是,Switch的BindVars函数还需要知道Var的类型 diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index 0eecb88..b206ad7 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -132,12 +132,18 @@ func (c *Client) SendStream(ctx context.Context, planID ioswitch.PlanID, varID i } } -func (c *Client) GetStream(planID ioswitch.PlanID, varID ioswitch.VarID) (io.ReadCloser, error) { +func (c *Client) GetStream(planID ioswitch.PlanID, varID ioswitch.VarID, signal *ioswitch.SignalVar) (io.ReadCloser, error) { ctx, cancel := context.WithCancel(context.Background()) + sdata, err := serder.ObjectToJSONEx(signal) + if err != nil { + return nil, err + } + stream, err := c.cli.GetStream(ctx, &GetStreamReq{ PlanID: string(planID), VarID: int32(varID), + Signal: string(sdata), }) if err != nil { cancel() @@ -163,15 +169,21 @@ func (c *Client) SendVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch return err } -func (c *Client) GetVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch.Var) (ioswitch.Var, error) { - data, err := serder.ObjectToJSONEx(v) +func (c *Client) GetVar(ctx context.Context, planID ioswitch.PlanID, v ioswitch.Var, signal *ioswitch.SignalVar) (ioswitch.Var, error) { + vdata, err := serder.ObjectToJSONEx(v) + if err != nil { + return nil, err + } + + sdata, err := serder.ObjectToJSONEx(signal) if err != nil { return nil, err } resp, err := c.cli.GetVar(ctx, &GetVarReq{ PlanID: string(planID), - Var: string(data), + Var: string(vdata), + Signal: string(sdata), }) if err != nil { return nil, err diff --git a/common/pkgs/ioswitch/ioswitch.go b/common/pkgs/ioswitch/ioswitch.go index d49b89f..a72989a 100644 --- a/common/pkgs/ioswitch/ioswitch.go +++ b/common/pkgs/ioswitch/ioswitch.go @@ -24,6 +24,8 @@ type Var interface { var VarUnion = types.NewTypeUnion[Var]( (*IntVar)(nil), (*StringVar)(nil), + (*SignalVar)(nil), + (*StreamVar)(nil), ) var _ = serder.UseTypeUnionExternallyTagged(&VarUnion) diff --git a/common/pkgs/ioswitch/ops/chunked.go b/common/pkgs/ioswitch/ops/chunked.go index c1e6ab7..048761c 100644 --- a/common/pkgs/ioswitch/ops/chunked.go +++ b/common/pkgs/ioswitch/ops/chunked.go @@ -64,7 +64,7 @@ func (o *ChunkedJoin) Execute(ctx context.Context, sw *ioswitch.Switch) error { }() fut := future.NewSetVoid() - o.Output.Stream = io2.AfterReadClosedOnce(io2.ChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { + o.Output.Stream = io2.AfterReadClosedOnce(io2.BufferedChunkedJoin(strReaders, o.ChunkSize), func(closer io.ReadCloser) { fut.SetVoid() }) sw.PutVars(o.Output) diff --git a/common/pkgs/ioswitch/ops/ec.go b/common/pkgs/ioswitch/ops/ec.go index 60843ed..b64f351 100644 --- a/common/pkgs/ioswitch/ops/ec.go +++ b/common/pkgs/ioswitch/ops/ec.go @@ -119,14 +119,11 @@ func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { } }() - outputVars := make([]*ioswitch.StreamVar, len(o.Outputs)) outputWrs := make([]*io.PipeWriter, len(o.Outputs)) for i := range o.Outputs { rd, wr := io.Pipe() - outputVars[i] = &ioswitch.StreamVar{ - Stream: rd, - } + o.Outputs[i].Stream = rd outputWrs[i] = wr } @@ -173,7 +170,7 @@ func (o *ECMultiply) Execute(ctx context.Context, sw *ioswitch.Switch) error { } }() - ioswitch.PutArrayVars(sw, outputVars) + ioswitch.PutArrayVars(sw, o.Outputs) err = fut.Wait(ctx) if err != nil { for _, wr := range outputWrs { diff --git a/common/pkgs/ioswitch/ops/grpc.go b/common/pkgs/ioswitch/ops/grpc.go index e14cbee..28d688a 100644 --- a/common/pkgs/ioswitch/ops/grpc.go +++ b/common/pkgs/ioswitch/ops/grpc.go @@ -44,6 +44,7 @@ func (o *SendStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type GetStream struct { + Signal *ioswitch.SignalVar `json:"signal"` Get *ioswitch.StreamVar `json:"get"` Output *ioswitch.StreamVar `json:"output"` Node cdssdk.Node `json:"node"` @@ -58,7 +59,7 @@ func (o *GetStream) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger.Debugf("getting stream %v as %v from node %v", o.Get.ID, o.Output.ID, o.Node) - str, err := agtCli.GetStream(sw.Plan().ID, o.Get.ID) + str, err := agtCli.GetStream(sw.Plan().ID, o.Get.ID, o.Signal) if err != nil { return fmt.Errorf("getting stream: %w", err) } @@ -103,9 +104,10 @@ func (o *SendVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { } type GetVar struct { - Get ioswitch.Var `json:"get"` - Output ioswitch.Var `json:"output"` - Node cdssdk.Node `json:"node"` + Signal *ioswitch.SignalVar `json:"signal"` + Get ioswitch.Var `json:"get"` + Output ioswitch.Var `json:"output"` + Node cdssdk.Node `json:"node"` } func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { @@ -117,7 +119,7 @@ func (o *GetVar) Execute(ctx context.Context, sw *ioswitch.Switch) error { logger.Debugf("getting var %v as %v from node %v", o.Get.GetID(), o.Output.GetID(), o.Node) - v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Get) + v2, err := agtCli.GetVar(ctx, sw.Plan().ID, o.Get, o.Signal) if err != nil { return fmt.Errorf("getting var: %w", err) } diff --git a/common/pkgs/ioswitch/ops/range.go b/common/pkgs/ioswitch/ops/range.go new file mode 100644 index 0000000..4c9634b --- /dev/null +++ b/common/pkgs/ioswitch/ops/range.go @@ -0,0 +1,70 @@ +package ops + +import ( + "context" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/math2" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type Range struct { + Input *ioswitch.StreamVar `json:"input"` + Output *ioswitch.StreamVar `json:"output"` + Offset int64 `json:"offset"` + Length *int64 `json:"length"` +} + +func (o *Range) Execute(ctx context.Context, sw *ioswitch.Switch) error { + err := sw.BindVars(ctx, o.Input) + if err != nil { + return err + } + defer o.Input.Stream.Close() + + buf := make([]byte, 1024*16) + + // 跳过前Offset个字节 + for o.Offset > 0 { + rdCnt := math2.Min(o.Offset, int64(len(buf))) + rd, err := o.Input.Stream.Read(buf[:rdCnt]) + if err == io.EOF { + // 输入流不够长度也不报错,只是产生一个空的流 + break + } + if err != nil { + return err + } + o.Offset -= int64(rd) + } + + fut := future.NewSetVoid() + + if o.Length == nil { + o.Output.Stream = io2.AfterEOF(o.Input.Stream, func(closer io.ReadCloser, err error) { + fut.SetVoid() + }) + + sw.PutVars(o.Output) + return fut.Wait(ctx) + } + + o.Output.Stream = io2.AfterEOF(io2.Length(o.Input.Stream, *o.Length), func(closer io.ReadCloser, err error) { + fut.SetVoid() + }) + + sw.PutVars(o.Output) + err = fut.Wait(ctx) + if err != nil { + return err + } + + io2.DropWithBuf(o.Input.Stream, buf) + return nil +} + +func init() { + OpUnion.AddT((*Range)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/sync.go b/common/pkgs/ioswitch/ops/sync.go index c0889b4..a932a46 100644 --- a/common/pkgs/ioswitch/ops/sync.go +++ b/common/pkgs/ioswitch/ops/sync.go @@ -93,6 +93,13 @@ func (w *HoldUntil) Execute(ctx context.Context, sw *ioswitch.Switch) error { return err } + for i := 0; i < len(w.Holds); i++ { + err := ioswitch.AssignVar(w.Holds[i], w.Emits[i]) + if err != nil { + return err + } + } + sw.PutVars(w.Emits...) return nil } diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index 42c342c..7321a16 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -7,6 +7,7 @@ import ( "sync" "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/utils/io2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -20,7 +21,14 @@ type Executor struct { executorSw *ioswitch.Switch } +// 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 func (e *Executor) BeginWrite(str io.ReadCloser, handle *ExecutorWriteStream) { + handle.Var.Stream = io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length) + e.executorSw.PutVars(handle.Var) +} + +// 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 +func (e *Executor) BeginWriteRanged(str io.ReadCloser, handle *ExecutorWriteStream) { handle.Var.Stream = str e.executorSw.PutVars(handle.Var) } @@ -99,7 +107,8 @@ func (e *Executor) stopWith(err error) { } type ExecutorWriteStream struct { - Var *ioswitch.StreamVar + Var *ioswitch.StreamVar + RangeHint *Range } type ExecutorReadStream struct { diff --git a/common/pkgs/ioswitch/plans/fromto.go b/common/pkgs/ioswitch/plans/fromto.go index 2cbb2e7..ba48829 100644 --- a/common/pkgs/ioswitch/plans/fromto.go +++ b/common/pkgs/ioswitch/plans/fromto.go @@ -2,13 +2,16 @@ package plans import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/common/utils/math2" ) type FromTo struct { - Object stgmod.ObjectDetail - Froms []From - Tos []To + Froms []From + Toes []To +} + +func NewFromTo() FromTo { + return FromTo{} } func (ft *FromTo) AddFrom(from From) *FromTo { @@ -17,7 +20,7 @@ func (ft *FromTo) AddFrom(from From) *FromTo { } func (ft *FromTo) AddTo(to To) *FromTo { - ft.Tos = append(ft.Tos, to) + ft.Toes = append(ft.Toes, to) return ft } @@ -28,13 +31,78 @@ type From interface { } type To interface { + // To所需要的文件流的范围。具体含义与DataIndex有关系: + // 如果DataIndex == -1,则表示在整个文件的范围。 + // 如果DataIndex >= 0,则表示在文件的某个分片的范围。 GetRange() Range GetDataIndex() int } type Range struct { Offset int64 - Length int64 + Length *int64 +} + +func (r *Range) Extend(other Range) { + newOffset := math2.Min(r.Offset, other.Offset) + + if r.Length == nil { + r.Offset = newOffset + return + } + + if other.Length == nil { + r.Offset = newOffset + r.Length = nil + return + } + + otherEnd := other.Offset + *other.Length + rEnd := r.Offset + *r.Length + + newEnd := math2.Max(otherEnd, rEnd) + r.Offset = newOffset + *r.Length = newEnd - newOffset +} + +func (r *Range) ExtendStart(start int64) { + r.Offset = math2.Min(r.Offset, start) +} + +func (r *Range) ExtendEnd(end int64) { + if r.Length == nil { + return + } + + rEnd := r.Offset + *r.Length + newLen := math2.Max(end, rEnd) - r.Offset + r.Length = &newLen +} + +func (r *Range) Fix(maxLength int64) { + if r.Length != nil { + return + } + + len := maxLength - r.Offset + r.Length = &len +} + +func (r *Range) ToStartEnd(maxLen int64) (start int64, end int64) { + if r.Length == nil { + return r.Offset, maxLen + } + + end = r.Offset + *r.Length + return r.Offset, end +} + +func (r *Range) ClampLength(maxLen int64) { + if r.Length == nil { + return + } + + *r.Length = math2.Min(*r.Length, maxLen-r.Offset) } type FromExecutor struct { @@ -42,6 +110,16 @@ type FromExecutor struct { DataIndex int } +func NewFromExecutor(dataIndex int) (*FromExecutor, *ExecutorWriteStream) { + handle := &ExecutorWriteStream{ + RangeHint: &Range{}, + } + return &FromExecutor{ + Handle: handle, + DataIndex: dataIndex, + }, handle +} + func (f *FromExecutor) GetDataIndex() int { return f.DataIndex } @@ -53,17 +131,19 @@ func (f *FromExecutor) BuildNode(ft *FromTo) Node { Handle: f.Handle, }, } - op.NewOutput(f.DataIndex) + op.NewOutputStream(f.DataIndex) return op } type FromNode struct { + FileHash string Node *cdssdk.Node DataIndex int } -func NewFromNode(node *cdssdk.Node, dataIndex int) *FromNode { +func NewFromNode(fileHash string, node *cdssdk.Node, dataIndex int) *FromNode { return &FromNode{ + FileHash: fileHash, Node: node, DataIndex: dataIndex, } @@ -87,6 +167,15 @@ func NewToExecutor(dataIndex int) (*ToExecutor, *ExecutorReadStream) { }, &str } +func NewToExecutorWithRange(dataIndex int, rng Range) (*ToExecutor, *ExecutorReadStream) { + str := ExecutorReadStream{} + return &ToExecutor{ + Handle: &str, + DataIndex: dataIndex, + Range: rng, + }, &str +} + func (t *ToExecutor) GetDataIndex() int { return t.DataIndex } @@ -95,26 +184,35 @@ func (t *ToExecutor) GetRange() Range { return t.Range } -type ToAgent struct { +type ToNode struct { Node cdssdk.Node DataIndex int Range Range FileHashStoreKey string } -func NewToAgent(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToAgent { - return &ToAgent{ +func NewToNode(node cdssdk.Node, dataIndex int, fileHashStoreKey string) *ToNode { + return &ToNode{ + Node: node, + DataIndex: dataIndex, + FileHashStoreKey: fileHashStoreKey, + } +} + +func NewToNodeWithRange(node cdssdk.Node, dataIndex int, fileHashStoreKey string, rng Range) *ToNode { + return &ToNode{ Node: node, DataIndex: dataIndex, FileHashStoreKey: fileHashStoreKey, + Range: rng, } } -func (t *ToAgent) GetDataIndex() int { +func (t *ToNode) GetDataIndex() int { return t.DataIndex } -func (t *ToAgent) GetRange() Range { +func (t *ToNode) GetRange() Range { return t.Range } diff --git a/common/pkgs/ioswitch/plans/ops.go b/common/pkgs/ioswitch/plans/ops.go index 73babfd..fec029b 100644 --- a/common/pkgs/ioswitch/plans/ops.go +++ b/common/pkgs/ioswitch/plans/ops.go @@ -1,6 +1,8 @@ package plans import ( + "fmt" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -31,6 +33,7 @@ type ValueVarType int const ( StringValueVar ValueVarType = iota + SignalValueVar ) type ValueVar struct { @@ -83,26 +86,23 @@ type Node struct { OutputValues []*ValueVar } -func (o *Node) NewOutput(dataIndex int) *StreamVar { +func (o *Node) NewOutputStream(dataIndex int) *StreamVar { v := &StreamVar{DataIndex: dataIndex, From: o} o.OutputStreams = append(o.OutputStreams, v) return v } -func (o *Node) AddInput(str *StreamVar) { +func (o *Node) AddInputStream(str *StreamVar) { o.InputStreams = append(o.InputStreams, str) str.AddTo(o) } -func (o *Node) ReplaceInput(org *StreamVar, new *StreamVar) { - idx := lo.IndexOf(o.InputStreams, org) - if idx < 0 { - return - } +func (o *Node) ReplaceInputStream(old *StreamVar, new *StreamVar) { + old.RemoveTo(o) + new.AddTo(o) - o.InputStreams[idx].RemoveTo(o) + idx := lo.IndexOf(o.InputStreams, old) o.InputStreams[idx] = new - new.AddTo(o) } func (o *Node) NewOutputVar(typ ValueVarType) *ValueVar { @@ -116,15 +116,16 @@ func (o *Node) AddInputVar(v *ValueVar) { v.AddTo(o) } -func (o *Node) ReplaceInputVar(org *ValueVar, new *ValueVar) { - idx := lo.IndexOf(o.InputValues, org) - if idx < 0 { - return - } +func (o *Node) ReplaceInputVar(old *ValueVar, new *ValueVar) { + old.RemoveTo(o) + new.AddTo(o) - o.InputValues[idx].RemoveTo(o) + idx := lo.IndexOf(o.InputValues, old) o.InputValues[idx] = new - new.AddTo(o) +} + +func (o *Node) String() string { + return fmt.Sprintf("Node(%T)", o.Type) } type IPFSReadType struct { @@ -143,6 +144,7 @@ func (t *IPFSReadType) GenerateOp(node *Node, blder *PlanBuilder) error { type IPFSWriteType struct { FileHashStoreKey string + Range Range } func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error { @@ -153,12 +155,12 @@ func (t *IPFSWriteType) GenerateOp(op *Node, blder *PlanBuilder) error { return nil } -type ChunkedSplitOp struct { +type ChunkedSplitType struct { ChunkSize int PaddingZeros bool } -func (t *ChunkedSplitOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *ChunkedSplitType) GenerateOp(op *Node, blder *PlanBuilder) error { addOpByEnv(&ops.ChunkedSplit{ Input: op.InputStreams[0].Var, Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { @@ -170,11 +172,11 @@ func (t *ChunkedSplitOp) GenerateOp(op *Node, blder *PlanBuilder) error { return nil } -type ChunkedJoinOp struct { +type ChunkedJoinType struct { ChunkSize int } -func (t *ChunkedJoinOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *ChunkedJoinType) GenerateOp(op *Node, blder *PlanBuilder) error { addOpByEnv(&ops.ChunkedJoin{ Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var @@ -185,9 +187,9 @@ func (t *ChunkedJoinOp) GenerateOp(op *Node, blder *PlanBuilder) error { return nil } -type CloneStreamOp struct{} +type CloneStreamType struct{} -func (t *CloneStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *CloneStreamType) GenerateOp(op *Node, blder *PlanBuilder) error { addOpByEnv(&ops.CloneStream{ Input: op.InputStreams[0].Var, Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { @@ -197,9 +199,9 @@ func (t *CloneStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { return nil } -type CloneVarOp struct{} +type CloneVarType struct{} -func (t *CloneVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { +func (t *CloneVarType) GenerateOp(op *Node, blder *PlanBuilder) error { addOpByEnv(&ops.CloneVar{ Raw: op.InputValues[0].Var, Cloneds: lo.Map(op.OutputValues, func(v *ValueVar, idx int) ioswitch.Var { @@ -210,8 +212,7 @@ func (t *CloneVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { } type MultiplyOp struct { - EC cdssdk.ECRedundancy - ChunkSize int + EC cdssdk.ECRedundancy } func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { @@ -224,7 +225,7 @@ func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { outputIdxs = append(outputIdxs, out.DataIndex) } - rs, _ := ec.NewRs(t.EC.K, t.EC.N) + rs, err := ec.NewRs(t.EC.K, t.EC.N) coef, err := rs.GenerateMatrix(inputIdxs, outputIdxs) if err != nil { return err @@ -234,7 +235,7 @@ func (t *MultiplyOp) GenerateOp(op *Node, blder *PlanBuilder) error { Coef: coef, Inputs: lo.Map(op.InputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }), Outputs: lo.Map(op.OutputStreams, func(v *StreamVar, idx int) *ioswitch.StreamVar { return v.Var }), - ChunkSize: t.ChunkSize, + ChunkSize: t.EC.ChunkSize, }, op.Env, blder) return nil } @@ -274,6 +275,7 @@ func (t *FromExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { type ToExecutorOp struct { Handle *ExecutorReadStream + Range Range } func (t *ToExecutorOp) GenerateOp(op *Node, blder *PlanBuilder) error { @@ -320,6 +322,7 @@ type GetStreamOp struct{} func (t *GetStreamOp) GenerateOp(op *Node, blder *PlanBuilder) error { fromAgt := op.InputStreams[0].From.Env.(*AgentEnv) addOpByEnv(&ops.GetStream{ + Signal: op.OutputValues[0].Var.(*ioswitch.SignalVar), Output: op.OutputStreams[0].Var, Get: op.InputStreams[0].Var, Node: fromAgt.Node, @@ -344,13 +347,50 @@ type GetVarOp struct{} func (t *GetVarOp) GenerateOp(op *Node, blder *PlanBuilder) error { fromAgt := op.InputValues[0].From.Env.(*AgentEnv) addOpByEnv(&ops.GetVar{ - Output: op.OutputValues[0].Var, + Signal: op.OutputValues[0].Var.(*ioswitch.SignalVar), + Output: op.OutputValues[1].Var, Get: op.InputValues[0].Var, Node: fromAgt.Node, }, op.Env, blder) return nil } +type RangeType struct { + Range Range +} + +func (t *RangeType) GenerateOp(op *Node, blder *PlanBuilder) error { + addOpByEnv(&ops.Range{ + Input: op.InputStreams[0].Var, + Output: op.OutputStreams[0].Var, + Offset: t.Range.Offset, + Length: t.Range.Length, + }, op.Env, blder) + return nil +} + +type HoldUntilOp struct { +} + +func (t *HoldUntilOp) GenerateOp(op *Node, blder *PlanBuilder) error { + o := &ops.HoldUntil{ + Waits: []*ioswitch.SignalVar{op.InputValues[0].Var.(*ioswitch.SignalVar)}, + } + + for i := 0; i < len(op.OutputValues); i++ { + o.Holds = append(o.Holds, op.InputValues[i+1].Var) + o.Emits = append(o.Emits, op.OutputValues[i].Var) + } + + for i := 0; i < len(op.OutputStreams); i++ { + o.Holds = append(o.Holds, op.InputStreams[i].Var) + o.Emits = append(o.Emits, op.OutputStreams[i].Var) + } + + addOpByEnv(o, op.Env, blder) + return nil +} + func addOpByEnv(op ioswitch.Op, env OpEnv, blder *PlanBuilder) { switch env := env.(type) { case *AgentEnv: diff --git a/common/pkgs/ioswitch/plans/parser.go b/common/pkgs/ioswitch/plans/parser.go index 2c0d881..e6f8b01 100644 --- a/common/pkgs/ioswitch/plans/parser.go +++ b/common/pkgs/ioswitch/plans/parser.go @@ -2,9 +2,12 @@ package plans import ( "fmt" + "math" + "gitlink.org.cn/cloudream/common/pkgs/ipfs" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/math2" ) type FromToParser interface { @@ -15,9 +18,19 @@ type DefaultParser struct { EC cdssdk.ECRedundancy } +func NewParser(ec cdssdk.ECRedundancy) *DefaultParser { + return &DefaultParser{ + EC: ec, + } +} + type ParseContext struct { - Ft FromTo - Nodes []*Node + Ft FromTo + Nodes []*Node + ToNodes []*Node + // 为了产生所有To所需的数据范围,而需要From打开的范围。 + // 这个范围是基于整个文件的,且上下界都取整到条带大小的整数倍,因此上界是有可能超过文件大小的。 + StreamRange Range } func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { @@ -25,6 +38,10 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { // 分成两个阶段: // 1. 基于From和To生成更多指令,初步匹配to的需求 + + // 计算一下打开流的范围 + p.calcStreamRange(&ctx) + err := p.extend(&ctx, ft, blder) if err != nil { return err @@ -80,6 +97,7 @@ func (p *DefaultParser) Parse(ft FromTo, blder *PlanBuilder) error { p.dropUnused(&ctx) p.storeIPFSWriteResult(&ctx) p.generateClone(&ctx) + p.generateRange(&ctx) p.generateSend(&ctx) return p.buildPlan(&ctx, blder) @@ -96,9 +114,44 @@ func (p *DefaultParser) findOutputStream(ctx *ParseContext, dataIndex int) *Stre return nil } +// 计算输入流的打开范围。会把流的范围按条带大小取整 +func (p *DefaultParser) calcStreamRange(ctx *ParseContext) { + stripSize := int64(p.EC.ChunkSize * p.EC.K) + + rng := Range{ + Offset: math.MaxInt64, + } + + for _, to := range ctx.Ft.Toes { + if to.GetDataIndex() == -1 { + toRng := to.GetRange() + rng.ExtendStart(math2.Floor(toRng.Offset, stripSize)) + if toRng.Length != nil { + rng.ExtendEnd(math2.Ceil(toRng.Offset+*toRng.Length, stripSize)) + } else { + rng.Length = nil + } + + } else { + toRng := to.GetRange() + + blkStartIndex := math2.FloorDiv(toRng.Offset, int64(p.EC.ChunkSize)) + rng.ExtendStart(blkStartIndex * stripSize) + if toRng.Length != nil { + blkEndIndex := math2.CeilDiv(toRng.Offset+*toRng.Length, int64(p.EC.ChunkSize)) + rng.ExtendEnd(blkEndIndex * stripSize) + } else { + rng.Length = nil + } + } + } + + ctx.StreamRange = rng +} + func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) error { for _, f := range ft.Froms { - n, err := p.buildFromNode(&ft, f) + n, err := p.buildFromNode(ctx, &ft, f) if err != nil { return err } @@ -108,11 +161,11 @@ func (p *DefaultParser) extend(ctx *ParseContext, ft FromTo, blder *PlanBuilder) if f.GetDataIndex() == -1 { splitOp := &Node{ Env: nil, - Type: &ChunkedSplitOp{ChunkSize: p.EC.ChunkSize, PaddingZeros: true}, + Type: &ChunkedSplitType{ChunkSize: p.EC.ChunkSize, PaddingZeros: true}, } - splitOp.AddInput(n.OutputStreams[0]) + splitOp.AddInputStream(n.OutputStreams[0]) for i := 0; i < p.EC.K; i++ { - splitOp.NewOutput(i) + splitOp.NewOutputStream(i) } ctx.Nodes = append(ctx.Nodes, splitOp) } @@ -134,56 +187,89 @@ loop: if len(ecInputStrs) == p.EC.K { mulOp := &Node{ Env: nil, - Type: &MultiplyOp{ChunkSize: p.EC.ChunkSize}, + Type: &MultiplyOp{EC: p.EC}, } for _, s := range ecInputStrs { - mulOp.AddInput(s) + mulOp.AddInputStream(s) } for i := 0; i < p.EC.N; i++ { - mulOp.NewOutput(i) + mulOp.NewOutputStream(i) } ctx.Nodes = append(ctx.Nodes, mulOp) joinOp := &Node{ Env: nil, - Type: &ChunkedJoinOp{p.EC.ChunkSize}, + Type: &ChunkedJoinType{p.EC.ChunkSize}, } for i := 0; i < p.EC.K; i++ { // 不可能找不到流 - joinOp.AddInput(p.findOutputStream(ctx, i)) + joinOp.AddInputStream(p.findOutputStream(ctx, i)) } - joinOp.NewOutput(-1) + joinOp.NewOutputStream(-1) + ctx.Nodes = append(ctx.Nodes, joinOp) } // 为每一个To找到一个输入流 - for _, t := range ft.Tos { + for _, t := range ft.Toes { n, err := p.buildToNode(&ft, t) if err != nil { return err } ctx.Nodes = append(ctx.Nodes, n) + ctx.ToNodes = append(ctx.ToNodes, n) str := p.findOutputStream(ctx, t.GetDataIndex()) if str == nil { return fmt.Errorf("no output stream found for data index %d", t.GetDataIndex()) } - n.AddInput(str) + n.AddInputStream(str) } return nil } -func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) { +func (p *DefaultParser) buildFromNode(ctx *ParseContext, ft *FromTo, f From) (*Node, error) { + var repRange Range + var blkRange Range + + repRange.Offset = ctx.StreamRange.Offset + blkRange.Offset = ctx.StreamRange.Offset / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize) + if ctx.StreamRange.Length != nil { + repRngLen := *ctx.StreamRange.Length + repRange.Length = &repRngLen + + blkRngLen := *ctx.StreamRange.Length / int64(p.EC.ChunkSize*p.EC.K) * int64(p.EC.ChunkSize) + blkRange.Length = &blkRngLen + } + switch f := f.(type) { case *FromNode: + ty := &IPFSReadType{ + FileHash: f.FileHash, + Option: ipfs.ReadOption{ + Offset: 0, + Length: -1, + }, + } + if f.DataIndex == -1 { + ty.Option.Offset = repRange.Offset + if repRange.Length != nil { + ty.Option.Length = *repRange.Length + } + } else { + ty.Option.Offset = blkRange.Offset + if blkRange.Length != nil { + ty.Option.Length = *blkRange.Length + } + } + n := &Node{ - // TODO2 需要FromTo的Range来设置Option - Type: &IPFSReadType{FileHash: ft.Object.Object.FileHash}, + Type: ty, } - n.NewOutput(f.DataIndex) + n.NewOutputStream(f.DataIndex) if f.Node != nil { n.Env = &AgentEnv{Node: *f.Node} @@ -196,7 +282,16 @@ func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) { Env: &ExecutorEnv{}, Type: &FromExecutorOp{Handle: f.Handle}, } - n.NewOutput(f.DataIndex) + n.NewOutputStream(f.DataIndex) + + if f.DataIndex == -1 { + f.Handle.RangeHint.Offset = repRange.Offset + f.Handle.RangeHint.Length = repRange.Length + } else { + f.Handle.RangeHint.Offset = blkRange.Offset + f.Handle.RangeHint.Length = blkRange.Length + } + return n, nil default: @@ -206,16 +301,19 @@ func (p *DefaultParser) buildFromNode(ft *FromTo, f From) (*Node, error) { func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) { switch t := t.(type) { - case *ToAgent: - return &Node{ + case *ToNode: + n := &Node{ Env: &AgentEnv{t.Node}, - Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey}, - }, nil + Type: &IPFSWriteType{FileHashStoreKey: t.FileHashStoreKey, Range: t.Range}, + } + n.NewOutputVar(StringValueVar) + + return n, nil case *ToExecutor: return &Node{ Env: &ExecutorEnv{}, - Type: &ToExecutorOp{Handle: t.Handle}, + Type: &ToExecutorOp{Handle: t.Handle, Range: t.Range}, }, nil default: @@ -227,7 +325,7 @@ func (p *DefaultParser) buildToNode(ft *FromTo, t To) (*Node, error) { func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { opted := false for i, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedJoinOp) + _, ok := op.Type.(*ChunkedJoinType) if !ok { continue } @@ -263,6 +361,7 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { } op.OutputStreams[i2] = nil + opted = true } op.OutputStreams = lo2.RemoveAllDefault(op.OutputStreams) @@ -272,9 +371,8 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { } ctx.Nodes[i] = nil + opted = true } - - opted = true } ctx.Nodes = lo2.RemoveAllDefault(ctx.Nodes) @@ -285,7 +383,7 @@ func (p *DefaultParser) removeUnusedMultiplyOutput(ctx *ParseContext) bool { func (p *DefaultParser) removeUnusedSplit(ctx *ParseContext) bool { opted := false for i, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedSplitOp) + _, ok := op.Type.(*ChunkedSplitType) if !ok { continue } @@ -320,7 +418,7 @@ loop: continue } - _, ok := splitOp.Type.(*ChunkedSplitOp) + _, ok := splitOp.Type.(*ChunkedSplitType) if !ok { continue } @@ -344,7 +442,7 @@ loop: } // 且这个目的地要是一个Join指令 - _, ok = joinOp.Type.(*ChunkedJoinOp) + _, ok = joinOp.Type.(*ChunkedJoinType) if !ok { continue } @@ -358,8 +456,8 @@ loop: // 所有条件都满足,可以开始省略操作,将Join操作的目的地的输入流替换为Split操作的输入流: // F->Split->Join->T 变换为:F->T splitOp.InputStreams[0].RemoveTo(splitOp) - for _, to := range joinOp.OutputStreams[0].Toes { - to.ReplaceInput(joinOp.OutputStreams[0], splitOp.InputStreams[0]) + for i := len(joinOp.OutputStreams[0].Toes) - 1; i >= 0; i-- { + joinOp.OutputStreams[0].Toes[i].ReplaceInputStream(joinOp.OutputStreams[0], splitOp.InputStreams[0]) } // 并删除这两个指令 @@ -376,7 +474,7 @@ loop: func (p *DefaultParser) pinSplit(ctx *ParseContext) bool { opted := false for _, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedSplitOp) + _, ok := op.Type.(*ChunkedSplitType) if !ok { continue } @@ -435,7 +533,7 @@ func (p *DefaultParser) pinSplit(ctx *ParseContext) bool { func (p *DefaultParser) pinJoin(ctx *ParseContext) bool { opted := false for _, op := range ctx.Nodes { - _, ok := op.Type.(*ChunkedJoinOp) + _, ok := op.Type.(*ChunkedJoinType) if !ok { continue } @@ -603,10 +701,10 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { for _, out := range op.OutputStreams { if len(out.Toes) == 0 { dropOp := &Node{ - Env: nil, + Env: op.Env, Type: &DropOp{}, } - dropOp.AddInput(out) + dropOp.AddInputStream(out) ctx.Nodes = append(ctx.Nodes, dropOp) } } @@ -636,6 +734,41 @@ func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { } } +// 生成Range指令。StreamRange可能超过文件总大小,但Range指令会在数据量不够时不报错而是正常返回 +func (p *DefaultParser) generateRange(ctx *ParseContext) { + for i, to := range ctx.ToNodes { + toDataIdx := ctx.Ft.Toes[i].GetDataIndex() + toRng := ctx.Ft.Toes[i].GetRange() + + if toDataIdx == -1 { + rngType := &RangeType{Range: Range{Offset: toRng.Offset - ctx.StreamRange.Offset, Length: toRng.Length}} + rngNode := &Node{ + Env: to.InputStreams[0].From.Env, + Type: rngType, + } + rngNode.AddInputStream(to.InputStreams[0]) + + to.ReplaceInputStream(to.InputStreams[0], rngNode.NewOutputStream(toDataIdx)) + ctx.Nodes = append(ctx.Nodes, rngNode) + } else { + stripSize := int64(p.EC.ChunkSize * p.EC.K) + blkStartIdx := ctx.StreamRange.Offset / stripSize + + blkStart := blkStartIdx * int64(p.EC.ChunkSize) + + rngType := &RangeType{Range: Range{Offset: toRng.Offset - blkStart, Length: toRng.Length}} + rngNode := &Node{ + Env: to.InputStreams[0].From.Env, + Type: rngType, + } + rngNode.AddInputStream(to.InputStreams[0]) + + to.ReplaceInputStream(to.InputStreams[0], rngNode.NewOutputStream(toDataIdx)) + ctx.Nodes = append(ctx.Nodes, rngNode) + } + } +} + // 生成Clone指令 func (p *DefaultParser) generateClone(ctx *ParseContext) { for _, op := range ctx.Nodes { @@ -646,13 +779,13 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { cloneOp := &Node{ Env: op.Env, - Type: &CloneStreamOp{}, + Type: &CloneStreamType{}, } - for _, to := range out.Toes { - to.ReplaceInput(out, cloneOp.NewOutput(out.DataIndex)) + for i := len(out.Toes) - 1; i >= 0; i-- { + out.Toes[i].ReplaceInputStream(out, cloneOp.NewOutputStream(out.DataIndex)) } out.Toes = nil - cloneOp.AddInput(out) + cloneOp.AddInputStream(out) ctx.Nodes = append(ctx.Nodes, cloneOp) } @@ -663,13 +796,14 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { cloneOp := &Node{ Env: op.Env, - Type: &CloneVarOp{}, + Type: &CloneVarType{}, } - for _, to := range out.Toes { - to.ReplaceInputVar(out, cloneOp.NewOutputVar(out.Type)) + for i := len(out.Toes) - 1; i >= 0; i-- { + out.Toes[i].ReplaceInputVar(out, cloneOp.NewOutputVar(out.Type)) } out.Toes = nil cloneOp.AddInputVar(out) + ctx.Nodes = append(ctx.Nodes, cloneOp) } } } @@ -678,21 +812,31 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) { func (p *DefaultParser) generateSend(ctx *ParseContext) { for _, op := range ctx.Nodes { for _, out := range op.OutputStreams { - to := out.Toes[0] - if to.Env.Equals(op.Env) { + toOp := out.Toes[0] + if toOp.Env.Equals(op.Env) { continue } - switch to.Env.(type) { + switch toOp.Env.(type) { case *ExecutorEnv: // 如果是要送到Executor,则只能由Executor主动去拉取 getStrOp := &Node{ Env: &ExecutorEnv{}, Type: &GetStreamOp{}, } - out.Toes = nil - getStrOp.AddInput(out) - to.ReplaceInput(out, getStrOp.NewOutput(out.DataIndex)) + + // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdOp := &Node{ + Env: op.Env, + Type: &HoldUntilOp{}, + } + holdOp.AddInputVar(getStrOp.NewOutputVar(SignalValueVar)) + holdOp.AddInputStream(out) + + getStrOp.AddInputStream(holdOp.NewOutputStream(out.DataIndex)) + toOp.ReplaceInputStream(out, getStrOp.NewOutputStream(out.DataIndex)) + + ctx.Nodes = append(ctx.Nodes, holdOp) ctx.Nodes = append(ctx.Nodes, getStrOp) case *AgentEnv: @@ -702,29 +846,39 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { Type: &SendStreamOp{}, } out.Toes = nil - sendStrOp.AddInput(out) - to.ReplaceInput(out, sendStrOp.NewOutput(out.DataIndex)) + sendStrOp.AddInputStream(out) + toOp.ReplaceInputStream(out, sendStrOp.NewOutputStream(out.DataIndex)) ctx.Nodes = append(ctx.Nodes, sendStrOp) } } for _, out := range op.OutputValues { - to := out.Toes[0] - if to.Env.Equals(op.Env) { + toOp := out.Toes[0] + if toOp.Env.Equals(op.Env) { continue } - switch to.Env.(type) { + switch toOp.Env.(type) { case *ExecutorEnv: // 如果是要送到Executor,则只能由Executor主动去拉取 - getVarOp := &Node{ + getStrOp := &Node{ Env: &ExecutorEnv{}, Type: &GetVarOp{}, } - out.Toes = nil - getVarOp.AddInputVar(out) - to.ReplaceInputVar(out, getVarOp.NewOutputVar(out.Type)) - ctx.Nodes = append(ctx.Nodes, getVarOp) + + // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达 + holdOp := &Node{ + Env: op.Env, + Type: &HoldUntilOp{}, + } + holdOp.AddInputVar(getStrOp.NewOutputVar(SignalValueVar)) + holdOp.AddInputVar(out) + + getStrOp.AddInputVar(holdOp.NewOutputVar(out.Type)) + toOp.ReplaceInputVar(out, getStrOp.NewOutputVar(out.Type)) + + ctx.Nodes = append(ctx.Nodes, holdOp) + ctx.Nodes = append(ctx.Nodes, getStrOp) case *AgentEnv: // 如果是要送到Agent,则可以直接发送 @@ -734,7 +888,7 @@ func (p *DefaultParser) generateSend(ctx *ParseContext) { } out.Toes = nil sendVarOp.AddInputVar(out) - to.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type)) + toOp.ReplaceInputVar(out, sendVarOp.NewOutputVar(out.Type)) ctx.Nodes = append(ctx.Nodes, sendVarOp) } } @@ -768,6 +922,8 @@ func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error { switch out.Type { case StringValueVar: out.Var = blder.NewStringVar() + case SignalValueVar: + out.Var = blder.NewSignalVar() } } @@ -780,6 +936,8 @@ func (p *DefaultParser) buildPlan(ctx *ParseContext, blder *PlanBuilder) error { switch in.Type { case StringValueVar: in.Var = blder.NewStringVar() + case SignalValueVar: + in.Var = blder.NewSignalVar() } } diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go index 1dceeab..f077a00 100644 --- a/common/pkgs/ioswitch/switch.go +++ b/common/pkgs/ioswitch/switch.go @@ -48,7 +48,7 @@ func (s *Switch) Run(ctx context.Context) error { if err != nil { cancel() - return err + return fmt.Errorf("%T: %w", o, err) } return nil diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 747c909..81b72f7 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -413,14 +413,19 @@ func (t *CheckPackageRedundancy) noneToEC(obj stgmod.ObjectDetail, red *cdssdk.E return nil, fmt.Errorf("requesting to get nodes: %w", err) } - planBlder := plans.NewPlanBuilder() - inputStrs := planBlder.AtAgent(getNodes.Nodes[0]).IPFSRead(obj.Object.FileHash).ChunkedSplit(red.ChunkSize, red.K, true) - outputStrs := planBlder.AtAgent(getNodes.Nodes[0]).ECReconstructAny(*red, lo.Range(red.K), lo.Range(red.N), inputStrs) + ft := plans.NewFromTo() + ft.AddFrom(plans.NewFromNode(obj.Object.FileHash, &getNodes.Nodes[0], -1)) for i := 0; i < red.N; i++ { - outputStrs[i].To(uploadNodes[i].Node).IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d", i)) + ft.AddTo(plans.NewToNode(uploadNodes[i].Node, i, fmt.Sprintf("%d", i))) + } + parser := plans.NewParser(*red) + plans := plans.NewPlanBuilder() + err = parser.Parse(ft, plans) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) } - ioRet, err := planBlder.Execute().Wait(context.TODO()) + ioRet, err := plans.Execute().Wait(context.TODO()) if err != nil { return nil, fmt.Errorf("executing io plan: %w", err) } @@ -510,17 +515,25 @@ func (t *CheckPackageRedundancy) ecToRep(obj stgmod.ObjectDetail, srcRed *cdssdk uploadNodes = lo.UniqBy(uploadNodes, func(item *NodeLoadInfo) cdssdk.NodeID { return item.Node.NodeID }) // 每个被选节点都在自己节点上重建原始数据 + parser := plans.NewParser(*srcRed) planBlder := plans.NewPlanBuilder() for i := range uploadNodes { - tarNode := planBlder.AtAgent(uploadNodes[i].Node) + ft := plans.NewFromTo() - var inputs []*plans.AgentStreamVar for _, block := range chosenBlocks { - inputs = append(inputs, tarNode.IPFSRead(block.FileHash)) + ft.AddFrom(plans.NewFromNode(block.FileHash, &uploadNodes[i].Node, block.Index)) } - outputs := tarNode.ECReconstruct(*srcRed, chosenBlockIndexes, inputs) - tarNode.ChunkedJoin(srcRed.ChunkSize, outputs).Length(obj.Object.Size).IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d", i)) + len := obj.Object.Size + ft.AddTo(plans.NewToNodeWithRange(uploadNodes[i].Node, -1, fmt.Sprintf("%d", i), plans.Range{ + Offset: 0, + Length: &len, + })) + + err := parser.Parse(ft, planBlder) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } } ioRet, err := planBlder.Execute().Wait(context.TODO()) @@ -555,11 +568,9 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. grpBlocks := obj.GroupBlocks() var chosenBlocks []stgmod.GrouppedObjectBlock - var chosenBlockIndexes []int for _, block := range grpBlocks { if len(block.NodeIDs) > 0 { chosenBlocks = append(chosenBlocks, block) - chosenBlockIndexes = append(chosenBlockIndexes, block.Index) } if len(chosenBlocks) == srcRed.K { @@ -572,6 +583,7 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. } // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 + parser := plans.NewParser(*srcRed) planBlder := plans.NewPlanBuilder() var newBlocks []stgmod.ObjectBlock @@ -595,15 +607,20 @@ func (t *CheckPackageRedundancy) ecToEC(obj stgmod.ObjectDetail, srcRed *cdssdk. shouldUpdateBlocks = true // 否则就要重建出这个节点需要的块 - tarNode := planBlder.AtAgent(node.Node) - var inputs []*plans.AgentStreamVar + ft := plans.NewFromTo() for _, block := range chosenBlocks { - inputs = append(inputs, tarNode.IPFSRead(block.FileHash)) + ft.AddFrom(plans.NewFromNode(block.FileHash, &node.Node, block.Index)) } // 输出只需要自己要保存的那一块 - tarNode.ECReconstructAny(*srcRed, chosenBlockIndexes, []int{i}, inputs)[0].IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d", i)) + ft.AddTo(plans.NewToNode(node.Node, i, fmt.Sprintf("%d", i))) + + err := parser.Parse(ft, planBlder) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + newBlocks = append(newBlocks, newBlock) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 664acb5..a20ef87 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -781,14 +781,20 @@ func (t *CleanPinned) makePlansForECObject(allNodeInfos map[cdssdk.NodeID]*cdssd } ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) + parser := plans.NewParser(*ecRed) for id, idxs := range reconstrct { - agt := planBld.AtAgent(*allNodeInfos[id]) + ft := plans.NewFromTo() + ft.AddFrom(plans.NewFromNode(obj.Object.FileHash, allNodeInfos[id], -1)) - strs := agt.IPFSRead(obj.Object.FileHash).ChunkedSplit(ecRed.ChunkSize, ecRed.K, true) - ss := agt.ECReconstructAny(*ecRed, lo.Range(ecRed.K), *idxs, strs) - for i, s := range ss { - s.IPFSWrite().ToExecutor().Store(fmt.Sprintf("%d.%d", obj.Object.ObjectID, (*idxs)[i])) + for _, i := range *idxs { + ft.AddTo(plans.NewToNode(*allNodeInfos[id], i, fmt.Sprintf("%d.%d", obj.Object.ObjectID, i))) + } + + err := parser.Parse(ft, planBld) + if err != nil { + // TODO 错误处理 + continue } planningNodeIDs[id] = true