From 2d6e484957adc6a7f27bdcd2675e3fb01d97a2d6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 22 Nov 2023 09:45:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96EC=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/ec/rs_test.go | 22 ++++--- common/pkgs/ec/stream_rs.go | 76 ++++++++-------------- common/pkgs/ioswitch/ops/ops.go | 45 +++++++++++-- common/pkgs/ioswitch/plans/plan_builder.go | 3 +- 4 files changed, 83 insertions(+), 63 deletions(-) diff --git a/common/pkgs/ec/rs_test.go b/common/pkgs/ec/rs_test.go index 415cd24..26db54c 100644 --- a/common/pkgs/ec/rs_test.go +++ b/common/pkgs/ec/rs_test.go @@ -185,16 +185,18 @@ func print_ioreaders(t *testing.T, readers []io.ReadCloser, chunkSize int64) { } func test_reconstructData(t *testing.T) { - blkReader, _ := NewBlockReader() - defer blkReader.Close() - hashs := []string{"QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56", "QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P", "QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn"} - dataBlocks, _ := blkReader.FetchBLocks(hashs) - chunkSize := int64(6) - enc, _ := NewRs(3, 5, chunkSize) - print("@@@@@@@@@") - newDataBlocks, _ := enc.ReconstructSome(dataBlocks, []int{0, 1, 2}, []int{3, 4}) - print("!!!!!!!!!") - print_ioreaders(t, newDataBlocks, chunkSize) + /* + blkReader, _ := NewBlockReader() + defer blkReader.Close() + hashs := []string{"QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56", "QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P", "QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn"} + dataBlocks, _ := blkReader.FetchBLocks(hashs) + chunkSize := int64(6) + enc, _ := NewRs(3, 5, chunkSize) + print("@@@@@@@@@") + newDataBlocks, _ := enc.ReconstructSome(dataBlocks, []int{0, 1, 2}, []int{3, 4}) + print("!!!!!!!!!") + print_ioreaders(t, newDataBlocks, chunkSize) + */ } func Test_main(t *testing.T) { //test_Encode(t) diff --git a/common/pkgs/ec/stream_rs.go b/common/pkgs/ec/stream_rs.go index 02402fb..c683468 100644 --- a/common/pkgs/ec/stream_rs.go +++ b/common/pkgs/ec/stream_rs.go @@ -4,6 +4,7 @@ import ( "io" "github.com/klauspost/reedsolomon" + myio "gitlink.org.cn/cloudream/common/utils/io" ) type Rs struct { @@ -140,78 +141,57 @@ func (r *Rs) ReconstructData(input []io.ReadCloser, inBlockIdx []int) ([]io.Read return dataReader, nil } -// 修复,任意k个块恢复若干想要的块 -func (r *Rs) ReconstructSome(input []io.ReadCloser, inBlockIdx []int, outBlockIdx []int) ([]io.ReadCloser, error) { - outReader := make([]io.ReadCloser, len(outBlockIdx)) - outWriter := make([]*io.PipeWriter, len(outBlockIdx)) +// 修复,任意k个块恢复若干想要的块。调用者应该保证input的每一个流长度相同,且均为chunkSize的整数倍 +func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) ([]io.ReadCloser, error) { + outReaders := make([]io.ReadCloser, len(outBlockIdx)) + outWriters := make([]*io.PipeWriter, len(outBlockIdx)) for i := 0; i < len(outBlockIdx); i++ { - var reader *io.PipeReader - reader, outWriter[i] = io.Pipe() - outReader[i] = reader + outReaders[i], outWriters[i] = io.Pipe() } + go func() { chunks := make([][]byte, r.ecN) - for i := range chunks { - chunks[i] = make([]byte, r.chunkSize) + for _, idx := range inBlockIdx { + chunks[idx] = make([]byte, r.chunkSize) } - finished := false + //outBools:要输出的若干块idx outBools := make([]bool, r.ecN) - for i := range outBools { - outBools[i] = false - } - for i := range outBlockIdx { - outBools[outBlockIdx[i]] = true - } - constructIdx := make([]bool, r.ecN) - for i := 0; i < r.ecN; i++ { - constructIdx[i] = false - } - for i := 0; i < r.ecK; i++ { - constructIdx[inBlockIdx[i]] = true - } - //nil Idx就是没有输入的块idx,要置成nil - nilIdx := make([]int, r.ecP) - ct := 0 - for i := 0; i < r.ecN; i++ { - if !constructIdx[i] { - nilIdx[ct] = i - ct++ - } + for _, idx := range outBlockIdx { + outBools[idx] = true } + var closeErr error + loop: for { //读块到buff for i := 0; i < r.ecK; i++ { - _, err := input[i].Read(chunks[inBlockIdx[i]]) + _, err := io.ReadFull(input[i], chunks[inBlockIdx[i]]) if err != nil { - finished = true - break + closeErr = err + break loop } } - for i := 0; i < r.ecP; i++ { - chunks[nilIdx[i]] = nil - } - if finished { - break - } - //解码 err := r.encoder.ReconstructSome(chunks, outBools) if err != nil { return } + //输出到outWriter for i := range outBlockIdx { - outWriter[i].Write(chunks[outBlockIdx[i]]) + err := myio.WriteAll(outWriters[i], chunks[outBlockIdx[i]]) + if err != nil { + closeErr = err + break loop + } } } - for i := range input { - input[i].Close() - } - for i := range outWriter { - outWriter[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) } }() - return outReader, nil + + return outReaders, nil } diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go index fc1fed1..5ac5a17 100644 --- a/common/pkgs/ioswitch/ops/ops.go +++ b/common/pkgs/ioswitch/ops/ops.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/types" myio "gitlink.org.cn/cloudream/common/utils/io" + mymath "gitlink.org.cn/cloudream/common/utils/math" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" @@ -186,7 +187,7 @@ func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } }() - var inputs []io.ReadCloser + var inputs []io.Reader for _, s := range strs { inputs = append(inputs, s.Stream) } @@ -211,6 +212,7 @@ func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { type Combine struct { InputIDs []ioswitch.StreamID `json:"inputIDs"` OutputID ioswitch.StreamID `json:"outputID"` + Length int64 `json:"length"` } func (o *Combine) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { @@ -218,16 +220,51 @@ func (o *Combine) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { if err != nil { return err } + defer func() { + for _, str := range strs { + str.Stream.Close() + } + }() + + length := o.Length pr, pw := io.Pipe() sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, pr)) + buf := make([]byte, 4096) for _, str := range strs { - _, err := io.Copy(pw, str.Stream) - if err != nil { - return err + for { + bufLen := mymath.Min(length, int64(len(buf))) + if bufLen == 0 { + return nil + } + + rd, err := str.Stream.Read(buf[:bufLen]) + if err != nil { + if err != io.EOF { + return err + } + + length -= int64(rd) + err = myio.WriteAll(pw, buf[:rd]) + if err != nil { + return err + } + + break + } + + length -= int64(rd) + err = myio.WriteAll(pw, buf[:rd]) + if err != nil { + return err + } } } + if length > 0 { + return fmt.Errorf("want %d bytes, but only get %d bytes", o.Length, o.Length-length) + } + return nil } diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index 1d32896..9caea68 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -189,7 +189,7 @@ func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBloc return mstr } -func (b *AgentPlanBuilder) Combine(streams ...*AgentStream) *AgentStream { +func (b *AgentPlanBuilder) Combine(length int64, streams ...*AgentStream) *AgentStream { agtStr := &AgentStream{ owner: b, info: b.owner.newStream(), @@ -203,6 +203,7 @@ func (b *AgentPlanBuilder) Combine(streams ...*AgentStream) *AgentStream { b.ops = append(b.ops, &ops.Combine{ InputIDs: inputStrIDs, OutputID: agtStr.info.ID, + Length: length, }) return agtStr