| @@ -17,17 +17,9 @@ create table Node ( | |||||
| ) comment = '节点表'; | ) comment = '节点表'; | ||||
| insert into | insert into | ||||
| Node ( | |||||
| NodeID, | |||||
| Name, | |||||
| LocalIP, | |||||
| ExternalIP, | |||||
| LocationID, | |||||
| State | |||||
| ) | |||||
| Node (NodeID, Name, LocalIP, ExternalIP, LocalGRPCPort, ExternalGRPCPort, LocationID, State) | |||||
| values | values | ||||
| (0, "LocalNode", "localhost", "localhost", 0, 1); | |||||
| (1, "localhost", "localhost", "localhost", 5010, 5010, 1, "alive") | |||||
| create table Storage ( | create table Storage ( | ||||
| StorageID int not null auto_increment primary key comment '存储服务ID', | StorageID int not null auto_increment primary key comment '存储服务ID', | ||||
| Name varchar(100) not null comment '存储服务名称', | Name varchar(100) not null comment '存储服务名称', | ||||
| @@ -0,0 +1,190 @@ | |||||
| package ec | |||||
| import ( | |||||
| "errors" | |||||
| "io" | |||||
| "io/ioutil" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ipfs" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| ) | |||||
| type BlockReader struct { | |||||
| ipfsCli *ipfs.PoolClient | |||||
| /*将文件分块相关的属性*/ | |||||
| //fileHash | |||||
| fileHash string | |||||
| //fileSize | |||||
| fileSize int64 | |||||
| //ecK将文件的分块数 | |||||
| ecK int | |||||
| //chunkSize | |||||
| chunkSize int64 | |||||
| /*可选项*/ | |||||
| //fastRead,true的时候直接通过hash读block | |||||
| jumpReadOpt bool | |||||
| } | |||||
| func NewBlockReader() (*BlockReader, error) { | |||||
| ipfsClient, err := stgglb.IPFSPool.Acquire() | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| //default:fast模式,通过hash直接获取 | |||||
| return &BlockReader{ipfsCli: ipfsClient, chunkSize: 256 * 1024, jumpReadOpt: false}, nil | |||||
| } | |||||
| func (r *BlockReader) Close() { | |||||
| r.ipfsCli.Close() | |||||
| } | |||||
| func (r *BlockReader) SetJumpRead(fileHash string, fileSize int64, ecK int) { | |||||
| r.fileHash = fileHash | |||||
| r.fileSize = fileSize | |||||
| r.ecK = ecK | |||||
| r.jumpReadOpt = true | |||||
| } | |||||
| func (r *BlockReader) SetchunkSize(size int64) { | |||||
| r.chunkSize = size | |||||
| } | |||||
| func (r *BlockReader) FetchBLock(blockHash string) (io.ReadCloser, error) { | |||||
| return r.ipfsCli.OpenRead(blockHash) | |||||
| } | |||||
| func (r *BlockReader) FetchBLocks(blockHashs []string) ([]io.ReadCloser, error) { | |||||
| readers := make([]io.ReadCloser, len(blockHashs)) | |||||
| for i, hash := range blockHashs { | |||||
| var err error | |||||
| readers[i], err = r.ipfsCli.OpenRead(hash) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| } | |||||
| return readers, nil | |||||
| } | |||||
| func (r *BlockReader) JumpFetchBlock(innerID int) (io.ReadCloser, error) { | |||||
| if !r.jumpReadOpt { | |||||
| return nil, nil | |||||
| } | |||||
| pipeReader, pipeWriter := io.Pipe() | |||||
| go func() { | |||||
| for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { | |||||
| reader, err := r.ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{Offset: i, Length: r.chunkSize}) | |||||
| if err != nil { | |||||
| pipeWriter.CloseWithError(err) | |||||
| return | |||||
| } | |||||
| data, err := ioutil.ReadAll(reader) | |||||
| if err != nil { | |||||
| pipeWriter.CloseWithError(err) | |||||
| return | |||||
| } | |||||
| reader.Close() | |||||
| _, err = pipeWriter.Write(data) | |||||
| if err != nil { | |||||
| pipeWriter.CloseWithError(err) | |||||
| return | |||||
| } | |||||
| } | |||||
| //如果文件大小不是分块的整数倍,可能需要补0 | |||||
| if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { | |||||
| //pktNum_1:chunkNum-1 | |||||
| pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) | |||||
| offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) | |||||
| count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset | |||||
| if count0 > 0 { | |||||
| add0 := make([]byte, count0) | |||||
| pipeWriter.Write(add0) | |||||
| } | |||||
| } | |||||
| pipeWriter.Close() | |||||
| }() | |||||
| return pipeReader, nil | |||||
| } | |||||
| // FetchBlock1这个函数废弃了 | |||||
| func (r *BlockReader) FetchBlock1(input interface{}, errMsg chan error) (io.ReadCloser, error) { | |||||
| /*两种模式下传入第一个参数,但是input的类型不同: | |||||
| jumpReadOpt-》true:传入blcokHash, string型,通过哈希直接读 | |||||
| jumpReadOpt->false: 传入innerID,int型,选择需要获取的数据块的id | |||||
| */ | |||||
| var innerID int | |||||
| var blockHash string | |||||
| switch input.(type) { | |||||
| case int: | |||||
| // 执行针对整数的逻辑分支 | |||||
| if r.jumpReadOpt { | |||||
| return nil, errors.New("conflict, wrong input type and jumpReadOpt:true") | |||||
| } else { | |||||
| innerID = input.(int) | |||||
| } | |||||
| case string: | |||||
| if !r.jumpReadOpt { | |||||
| return nil, errors.New("conflict, wrong input type and jumpReadOpt:false") | |||||
| } else { | |||||
| blockHash = input.(string) | |||||
| } | |||||
| default: | |||||
| return nil, errors.New("wrong input type") | |||||
| } | |||||
| //开始执行 | |||||
| if r.jumpReadOpt { //快速读 | |||||
| ipfsCli, err := stgglb.IPFSPool.Acquire() | |||||
| if err != nil { | |||||
| logger.Warnf("new ipfs client: %s", err.Error()) | |||||
| return nil, err | |||||
| } | |||||
| defer ipfsCli.Close() | |||||
| return ipfsCli.OpenRead(blockHash) | |||||
| } else { //跳跃读 | |||||
| ipfsCli, err := stgglb.IPFSPool.Acquire() | |||||
| if err != nil { | |||||
| logger.Warnf("new ipfs client: %s", err.Error()) | |||||
| return nil, err | |||||
| } | |||||
| defer ipfsCli.Close() | |||||
| pipeReader, pipeWriter := io.Pipe() | |||||
| go func() { | |||||
| for i := int64(r.chunkSize * int64(innerID)); i < r.fileSize; i += int64(r.ecK) * r.chunkSize { | |||||
| reader, err := ipfsCli.OpenRead(r.fileHash, ipfs.ReadOption{i, r.chunkSize}) | |||||
| if err != nil { | |||||
| pipeWriter.Close() | |||||
| errMsg <- err | |||||
| return | |||||
| } | |||||
| data, err := ioutil.ReadAll(reader) | |||||
| if err != nil { | |||||
| pipeWriter.Close() | |||||
| errMsg <- err | |||||
| return | |||||
| } | |||||
| reader.Close() | |||||
| _, err = pipeWriter.Write(data) | |||||
| if err != nil { | |||||
| pipeWriter.Close() | |||||
| errMsg <- err | |||||
| return | |||||
| } | |||||
| } | |||||
| //如果文件大小不是分块的整数倍,可能需要补0 | |||||
| if r.fileSize%(r.chunkSize*int64(r.ecK)) != 0 { | |||||
| //pktNum_1:chunkNum-1 | |||||
| pktNum_1 := r.fileSize / (r.chunkSize * int64(r.ecK)) | |||||
| offset := (r.fileSize - int64(pktNum_1)*int64(r.ecK)*r.chunkSize) | |||||
| count0 := int64(innerID)*int64(r.ecK)*r.chunkSize - offset | |||||
| if count0 > 0 { | |||||
| add0 := make([]byte, count0) | |||||
| pipeWriter.Write(add0) | |||||
| } | |||||
| } | |||||
| pipeWriter.Close() | |||||
| errMsg <- nil | |||||
| }() | |||||
| return pipeReader, nil | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,221 @@ | |||||
| package ec | |||||
| import ( | |||||
| "bytes" | |||||
| "fmt" | |||||
| "io" | |||||
| "io/ioutil" | |||||
| "testing" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ipfs" | |||||
| //"gitlink.org.cn/cloudream/common/pkgs/ipfs" | |||||
| //"gitlink.org.cn/cloudream/storage/agent/internal/config" | |||||
| //stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| ) | |||||
| func test_Encode(t *testing.T) { | |||||
| enc, _ := NewRs(3, 5, 10) | |||||
| rc := make([]io.ReadCloser, 3) | |||||
| rc[0] = ioutil.NopCloser(bytes.NewBufferString("11111111")) | |||||
| rc[1] = ioutil.NopCloser(bytes.NewBufferString("22222222")) | |||||
| rc[2] = ioutil.NopCloser(bytes.NewBufferString("33333333")) | |||||
| /*rc[0].Close() | |||||
| rc[1].Close() | |||||
| rc[2].Close()*/ | |||||
| print("#$$$$$$$$$$$") | |||||
| out, _ := enc.ReconstructData(rc, []int{0, 1, 2}) | |||||
| //out, _ := enc.Encode(rc) | |||||
| buf := make([]byte, 100) | |||||
| out[0].Read(buf) | |||||
| fmt.Println(buf) | |||||
| out[1].Read(buf) | |||||
| fmt.Println(buf) | |||||
| t.Logf(string(buf)) | |||||
| t.Log(buf) | |||||
| } | |||||
| /* | |||||
| ------------------------------------------------ | |||||
| hash:QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6 | |||||
| 内容:1111122222233333333334444444445663454543534534 | |||||
| hash:QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW | |||||
| (5,3),chunkSize:6 | |||||
| data1:QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56 | |||||
| data2:QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P | |||||
| data3:QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn | |||||
| 内容:qqqqqqqqwwwwwwwwwwwwwweeeeeeeeeeeeerrrrrrrrrrr | |||||
| ----------------------------------------------------- | |||||
| */ | |||||
| func test_Fetch(t *testing.T) { | |||||
| blkReader, _ := NewBlockReader() | |||||
| /*****************************FetchBlock*************************/ | |||||
| /*r, _ := blkReader.FetchBLock("QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6") | |||||
| data, _ := ioutil.ReadAll(r) | |||||
| t.Logf(string(data))*/ | |||||
| /**********************FetchBlocks************************************ | |||||
| hashs := []string{"QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", "QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6"} | |||||
| rs, _ := blkReader.FetchBLocks(hashs) | |||||
| data1, _ := ioutil.ReadAll(rs[0]) | |||||
| data2, _ := ioutil.ReadAll(rs[1]) | |||||
| t.Logf(string(data1)) | |||||
| t.Logf(string(data2)) | |||||
| /*************************JumpFetchBlock*********************************/ | |||||
| blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) | |||||
| blkReader.SetchunkSize(6) | |||||
| r, _ := blkReader.JumpFetchBlock(1) | |||||
| data, _ := ioutil.ReadAll(r) | |||||
| t.Logf(string(data)) | |||||
| } | |||||
| func test_Fetch_and_Encode(t *testing.T) { | |||||
| chunkSize := int64(6) | |||||
| blkReader, _ := NewBlockReader() | |||||
| defer blkReader.Close() | |||||
| blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) | |||||
| blkReader.SetchunkSize(int64(chunkSize)) | |||||
| dataBlocks := make([]io.ReadCloser, 3) | |||||
| for i := range dataBlocks { | |||||
| dataBlocks[i], _ = blkReader.JumpFetchBlock(i) | |||||
| } | |||||
| enc, _ := NewRs(3, 5, chunkSize) | |||||
| parityBlocks, _ := enc.Encode(dataBlocks) | |||||
| parityData := make([]string, 2) | |||||
| finished := false | |||||
| for { | |||||
| if finished { | |||||
| break | |||||
| } | |||||
| buf := make([]byte, chunkSize) | |||||
| for i, pipe := range parityBlocks { | |||||
| _, err := pipe.Read(buf) | |||||
| if err != nil { | |||||
| finished = true | |||||
| break | |||||
| } | |||||
| parityData[i] = parityData[i] + string(buf) | |||||
| } | |||||
| } | |||||
| t.Logf(parityData[0]) | |||||
| t.Logf(parityData[1]) | |||||
| } | |||||
| func test_Fetch_and_Encode_and_Degraded(t *testing.T) { | |||||
| chunkSize := int64(6) | |||||
| blkReader, _ := NewBlockReader() | |||||
| defer blkReader.Close() | |||||
| blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) | |||||
| blkReader.SetchunkSize(int64(chunkSize)) | |||||
| dataBlocks := make([]io.ReadCloser, 3) | |||||
| for i := range dataBlocks { | |||||
| dataBlocks[i], _ = blkReader.JumpFetchBlock(i) | |||||
| } | |||||
| enc, _ := NewRs(3, 5, chunkSize) | |||||
| parityBlocks, _ := enc.Encode(dataBlocks) | |||||
| go func() { | |||||
| ioutil.ReadAll(parityBlocks[0]) | |||||
| }() | |||||
| degradedBlocks := make([]io.ReadCloser, 3) | |||||
| degradedBlocks[0], _ = blkReader.JumpFetchBlock(1) | |||||
| degradedBlocks[1], _ = blkReader.JumpFetchBlock(2) | |||||
| degradedBlocks[2] = parityBlocks[1] | |||||
| newDataBlocks, _ := enc.ReconstructData(degradedBlocks, []int{1, 2, 4}) | |||||
| newData := make([]string, 3) | |||||
| finished := false | |||||
| for { | |||||
| if finished { | |||||
| break | |||||
| } | |||||
| buf := make([]byte, chunkSize) | |||||
| for i, pipe := range newDataBlocks { | |||||
| _, err := pipe.Read(buf) | |||||
| if err != nil { | |||||
| finished = true | |||||
| break | |||||
| } | |||||
| newData[i] = newData[i] + string(buf) | |||||
| } | |||||
| } | |||||
| t.Logf(newData[0]) | |||||
| t.Logf(newData[1]) | |||||
| t.Logf(newData[2]) | |||||
| } | |||||
| func test_pin_data_blocks(t *testing.T) { | |||||
| chunkSize := int64(6) | |||||
| blkReader, _ := NewBlockReader() | |||||
| defer blkReader.Close() | |||||
| blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) | |||||
| blkReader.SetchunkSize(int64(chunkSize)) | |||||
| dataBlocks := make([]io.ReadCloser, 3) | |||||
| ipfsclient, _ := stgglb.IPFSPool.Acquire() | |||||
| for i := range dataBlocks { | |||||
| dataBlocks[i], _ = blkReader.JumpFetchBlock(i) | |||||
| hash, _ := ipfsclient.CreateFile(dataBlocks[i]) | |||||
| t.Logf(hash) | |||||
| } | |||||
| } | |||||
| func print_ioreaders(t *testing.T, readers []io.ReadCloser, chunkSize int64) { | |||||
| newData := make([]string, len(readers)) | |||||
| finished := false | |||||
| for { | |||||
| if finished { | |||||
| break | |||||
| } | |||||
| buf := make([]byte, chunkSize) | |||||
| for i, pipe := range readers { | |||||
| _, err := pipe.Read(buf) | |||||
| if err != nil { | |||||
| finished = true | |||||
| break | |||||
| } | |||||
| newData[i] = newData[i] + string(buf) | |||||
| } | |||||
| } | |||||
| for _, data := range newData { | |||||
| t.Logf(data) | |||||
| } | |||||
| } | |||||
| func test_reconstructData(t *testing.T) { | |||||
| blkReader, _ := NewBlockReader() | |||||
| defer blkReader.Close() | |||||
| hashs := []string{"QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56", "QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P", "QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn"} | |||||
| dataBlocks, _ := blkReader.FetchBLocks(hashs) | |||||
| chunkSize := int64(6) | |||||
| enc, _ := NewRs(3, 5, chunkSize) | |||||
| print("@@@@@@@@@") | |||||
| newDataBlocks, _ := enc.ReconstructSome(dataBlocks, []int{0, 1, 2}, []int{3, 4}) | |||||
| print("!!!!!!!!!") | |||||
| print_ioreaders(t, newDataBlocks, chunkSize) | |||||
| } | |||||
| func Test_main(t *testing.T) { | |||||
| //test_Encode(t) | |||||
| //stgglb.InitLocal(&config.Cfg().Local) | |||||
| stgglb.InitIPFSPool(&ipfs.Config{Port: 5001}) | |||||
| //test_Fetch(t) | |||||
| //test_Fetch_and_Encode(t) | |||||
| //test_Fetch_and_Encode_and_Degraded(t) | |||||
| //test_pin_data_blocks(t) | |||||
| test_reconstructData(t) | |||||
| } | |||||
| /* | |||||
| func Test_Fetch_Encode_ReconstructData(t *testing.T) { | |||||
| inFileName := "test.txt" | |||||
| enc, _ := NewRs(3, 5, 10) | |||||
| file, err := os.Open(inFileName) | |||||
| if err != nil { | |||||
| t.Error(err) | |||||
| } | |||||
| var data io.ReadCloser | |||||
| data = file | |||||
| //enc.Encode(data) | |||||
| }*/ | |||||
| @@ -0,0 +1,217 @@ | |||||
| package ec | |||||
| import ( | |||||
| "io" | |||||
| "github.com/klauspost/reedsolomon" | |||||
| ) | |||||
| type Rs struct { | |||||
| encoder reedsolomon.Encoder | |||||
| ecN int | |||||
| ecK int | |||||
| ecP int | |||||
| chunkSize int64 | |||||
| } | |||||
| func NewRs(k int, n int, chunkSize int64) (*Rs, error) { | |||||
| enc := Rs{ | |||||
| ecN: n, | |||||
| ecK: k, | |||||
| ecP: n - k, | |||||
| chunkSize: chunkSize, | |||||
| } | |||||
| encoder, err := reedsolomon.New(k, n-k) | |||||
| enc.encoder = encoder | |||||
| return &enc, err | |||||
| } | |||||
| // 编码 | |||||
| func (r *Rs) Encode(data []io.ReadCloser) ([]io.ReadCloser, error) { | |||||
| output := make([]io.ReadCloser, r.ecP) | |||||
| parity := make([]*io.PipeWriter, r.ecP) | |||||
| for i := range output { | |||||
| var reader *io.PipeReader | |||||
| reader, parity[i] = io.Pipe() | |||||
| output[i] = reader | |||||
| } | |||||
| go func() { | |||||
| chunks := make([][]byte, r.ecN) | |||||
| for i := range chunks { | |||||
| chunks[i] = make([]byte, r.chunkSize) | |||||
| } | |||||
| for { | |||||
| finished := false | |||||
| //读数据块到buff | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| _, err := data[i].Read(chunks[i]) | |||||
| if err != nil { | |||||
| finished = true | |||||
| break | |||||
| } | |||||
| } | |||||
| if finished { | |||||
| break | |||||
| } | |||||
| //编码 | |||||
| err := r.encoder.Encode(chunks) | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| //输出到writer | |||||
| for i := r.ecK; i < r.ecN; i++ { | |||||
| parity[i-r.ecK].Write(chunks[i]) | |||||
| } | |||||
| } | |||||
| for i := range data { | |||||
| data[i].Close() | |||||
| } | |||||
| for i := range parity { | |||||
| parity[i].Close() | |||||
| } | |||||
| }() | |||||
| return output, nil | |||||
| } | |||||
| // 降级读,任意k个块恢复出原始数据块 | |||||
| func (r *Rs) ReconstructData(input []io.ReadCloser, inBlockIdx []int) ([]io.ReadCloser, error) { | |||||
| dataReader := make([]io.ReadCloser, r.ecK) | |||||
| dataWriter := make([]*io.PipeWriter, r.ecK) | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| var reader *io.PipeReader | |||||
| reader, dataWriter[i] = io.Pipe() | |||||
| dataReader[i] = reader | |||||
| } | |||||
| go func() { | |||||
| chunks := make([][]byte, r.ecN) | |||||
| for i := range chunks { | |||||
| chunks[i] = make([]byte, r.chunkSize) | |||||
| } | |||||
| constructIdx := make([]bool, r.ecN) | |||||
| for i := 0; i < r.ecN; i++ { | |||||
| constructIdx[i] = false | |||||
| } | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| constructIdx[inBlockIdx[i]] = true | |||||
| } | |||||
| nilIdx := make([]int, r.ecP) | |||||
| ct := 0 | |||||
| for i := 0; i < r.ecN; i++ { | |||||
| if !constructIdx[i] { | |||||
| nilIdx[ct] = i | |||||
| ct++ | |||||
| } | |||||
| } | |||||
| for { | |||||
| finished := false | |||||
| //读数据块到buff | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| _, err := input[i].Read(chunks[inBlockIdx[i]]) | |||||
| if err != nil { | |||||
| finished = true | |||||
| break | |||||
| } | |||||
| } | |||||
| for i := 0; i < r.ecP; i++ { | |||||
| chunks[nilIdx[i]] = nil | |||||
| } | |||||
| if finished { | |||||
| break | |||||
| } | |||||
| //解码 | |||||
| err := r.encoder.ReconstructData(chunks) | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| //输出到writer | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| dataWriter[i].Write(chunks[i]) | |||||
| } | |||||
| } | |||||
| for i := range input { | |||||
| input[i].Close() | |||||
| } | |||||
| for i := range dataWriter { | |||||
| dataWriter[i].Close() | |||||
| } | |||||
| }() | |||||
| return dataReader, nil | |||||
| } | |||||
| // 修复,任意k个块恢复若干想要的块 | |||||
| func (r *Rs) ReconstructSome(input []io.ReadCloser, inBlockIdx []int, outBlockIdx []int) ([]io.ReadCloser, error) { | |||||
| outReader := make([]io.ReadCloser, len(outBlockIdx)) | |||||
| outWriter := make([]*io.PipeWriter, len(outBlockIdx)) | |||||
| for i := 0; i < len(outBlockIdx); i++ { | |||||
| var reader *io.PipeReader | |||||
| reader, outWriter[i] = io.Pipe() | |||||
| outReader[i] = reader | |||||
| } | |||||
| go func() { | |||||
| chunks := make([][]byte, r.ecN) | |||||
| for i := range chunks { | |||||
| chunks[i] = make([]byte, r.chunkSize) | |||||
| } | |||||
| finished := false | |||||
| //outBools:要输出的若干块idx | |||||
| outBools := make([]bool, r.ecN) | |||||
| for i := range outBools { | |||||
| outBools[i] = false | |||||
| } | |||||
| for i := range outBlockIdx { | |||||
| outBools[outBlockIdx[i]] = true | |||||
| } | |||||
| constructIdx := make([]bool, r.ecN) | |||||
| for i := 0; i < r.ecN; i++ { | |||||
| constructIdx[i] = false | |||||
| } | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| constructIdx[inBlockIdx[i]] = true | |||||
| } | |||||
| //nil Idx就是没有输入的块idx,要置成nil | |||||
| nilIdx := make([]int, r.ecP) | |||||
| ct := 0 | |||||
| for i := 0; i < r.ecN; i++ { | |||||
| if !constructIdx[i] { | |||||
| nilIdx[ct] = i | |||||
| ct++ | |||||
| } | |||||
| } | |||||
| for { | |||||
| //读块到buff | |||||
| for i := 0; i < r.ecK; i++ { | |||||
| _, err := input[i].Read(chunks[inBlockIdx[i]]) | |||||
| if err != nil { | |||||
| finished = true | |||||
| break | |||||
| } | |||||
| } | |||||
| for i := 0; i < r.ecP; i++ { | |||||
| chunks[nilIdx[i]] = nil | |||||
| } | |||||
| if finished { | |||||
| break | |||||
| } | |||||
| //解码 | |||||
| err := r.encoder.ReconstructSome(chunks, outBools) | |||||
| if err != nil { | |||||
| return | |||||
| } | |||||
| //输出到outWriter | |||||
| for i := range outBlockIdx { | |||||
| outWriter[i].Write(chunks[outBlockIdx[i]]) | |||||
| } | |||||
| } | |||||
| for i := range input { | |||||
| input[i].Close() | |||||
| } | |||||
| for i := range outWriter { | |||||
| outWriter[i].Close() | |||||
| } | |||||
| }() | |||||
| return outReader, nil | |||||
| } | |||||
| @@ -50,6 +50,7 @@ require ( | |||||
| github.com/json-iterator/go v1.1.12 // indirect | github.com/json-iterator/go v1.1.12 // indirect | ||||
| github.com/jtolds/gls v4.20.0+incompatible // indirect | github.com/jtolds/gls v4.20.0+incompatible // indirect | ||||
| github.com/klauspost/cpuid/v2 v2.2.4 // indirect | github.com/klauspost/cpuid/v2 v2.2.4 // indirect | ||||
| github.com/klauspost/reedsolomon v1.11.8 // indirect | |||||
| github.com/leodido/go-urn v1.2.4 // indirect | github.com/leodido/go-urn v1.2.4 // indirect | ||||
| github.com/libp2p/go-buffer-pool v0.1.0 // indirect | github.com/libp2p/go-buffer-pool v0.1.0 // indirect | ||||
| github.com/libp2p/go-flow-metrics v0.1.0 // indirect | github.com/libp2p/go-flow-metrics v0.1.0 // indirect | ||||
| @@ -62,6 +62,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= | |||||
| github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= | ||||
| github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= | github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= | ||||
| github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= | github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= | ||||
| github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= | |||||
| github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | ||||
| github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= | github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= | ||||
| github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= | ||||
| @@ -89,6 +90,8 @@ github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02 | |||||
| github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= | github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= | ||||
| github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= | github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= | ||||
| github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= | github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= | ||||
| github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY= | |||||
| github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A= | |||||
| github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= | github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= | ||||
| github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= | github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= | ||||
| github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= | github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= | ||||
| @@ -107,6 +110,7 @@ github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4 | |||||
| github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= | github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= | ||||
| github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= | github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= | ||||
| github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= | github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= | ||||
| github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= | |||||
| github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= | github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= | ||||
| github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= | github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= | ||||
| github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= | github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= | ||||