diff --git a/common/models/models.go b/common/models/models.go index 967c45a..bb945a8 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -34,7 +34,7 @@ type EC struct { ID int64 `json:"id"` K int `json:"k"` N int `json:"n"` - ChunkSize int `json:"chunkSize"` + ChunkSize int64 `json:"chunkSize"` } type ObjectBlockData struct { @@ -51,7 +51,7 @@ func NewObjectBlockData(index int, fileHash string, nodeIDs []int64) ObjectBlock } } -func NewEc(id int64, k int, n int, chunkSize int) EC { +func NewEc(id int64, k int, n int, chunkSize int64) EC { return EC{ ID: id, K: k, diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go index d6b8717..fc1fed1 100644 --- a/common/pkgs/ioswitch/ops/ops.go +++ b/common/pkgs/ioswitch/ops/ops.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "sync" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -13,6 +14,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) @@ -169,7 +171,40 @@ type ECCompute struct { } func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { - // TODO2 + rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + if err != nil { + return fmt.Errorf("new ec: %w", err) + } + + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + defer func() { + for _, s := range strs { + s.Stream.Close() + } + }() + + var inputs []io.ReadCloser + for _, s := range strs { + inputs = append(inputs, s.Stream) + } + + outputs, err := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) + if err != nil { + return fmt.Errorf("reconstructing: %w", err) + } + + wg := sync.WaitGroup{} + for i, id := range o.OutputIDs { + wg.Add(1) + sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosed(outputs[i], func(closer io.ReadCloser) { + wg.Done() + }))) + } + wg.Wait() + return nil } diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go index 6facacf..cc23209 100644 --- a/common/pkgs/ioswitch/plans/executor.go +++ b/common/pkgs/ioswitch/plans/executor.go @@ -1,6 +1,7 @@ package plans import ( + "context" "errors" "fmt" "io" @@ -102,6 +103,10 @@ func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) { }), nil } +func (e *Executor) Wait() (ExecutorResult, error) { + return e.callback.WaitValue(context.TODO()) +} + func (e *Executor) cancelAll() { for _, cli := range e.mqClis { cli.CancelIOPlan(agtmq.NewCancelIOPlan(e.plan.ID)) @@ -116,8 +121,8 @@ func (e *Executor) Close() { func (e *Executor) pollResult() { wg := sync.WaitGroup{} - anyErr := atomic.Value{} - anyErr.Store(nil) + var anyErr error + var done atomic.Bool rets := make([]*ioswitch.PlanResult, len(e.plan.AgentPlans)) for i, id := range e.planTaskIDs { @@ -131,20 +136,21 @@ func (e *Executor) pollResult() { for { resp, err := e.mqClis[idx].WaitIOPlan(agtmq.NewWaitIOPlan(taskID, 5000)) if err != nil { - anyErr.Store(err) + anyErr = err break } if resp.IsComplete { if resp.Error != "" { - anyErr.Store(errors.New(resp.Error)) + anyErr = errors.New(resp.Error) + done.Store(true) } else { rets[idx] = &resp.Result } break } - if anyErr.Load() != nil { + if done.Load() { break } } @@ -153,9 +159,8 @@ func (e *Executor) pollResult() { wg.Wait() - err := anyErr.Load().(error) - if err != nil { - e.callback.SetError(err) + if anyErr != nil { + e.callback.SetError(anyErr) return } diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go index d831cd2..1d32896 100644 --- a/common/pkgs/ioswitch/plans/plan_builder.go +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -84,7 +84,7 @@ func (s *FromExecutorStream) ToNode(node model.Node) *AgentStream { s.toNode = &node return &AgentStream{ owner: s.owner.AtAgent(node), - info: s.owner.newStream(), + info: s.info, } } @@ -108,7 +108,7 @@ func (s *AgentStream) IPFSWrite(resultKey string) { func (s *AgentStream) GRPCSend(node model.Node) *AgentStream { agtStr := &AgentStream{ owner: s.owner.owner.AtAgent(node), - info: s.owner.owner.newStream(), + info: s.info, } s.owner.ops = append(s.owner.ops, &ops.GRPCSend{ diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go index 00f0bb3..aa1c584 100644 --- a/common/pkgs/ioswitch/switch.go +++ b/common/pkgs/ioswitch/switch.go @@ -42,12 +42,13 @@ type Planning struct { func NewPlanning(plan Plan) Planning { planning := Planning{ - plan: plan, - callback: future.NewSetValue[PlanResult](), - readys: make(map[StreamID]Stream), + plan: plan, + resultValues: make(map[string]any), + callback: future.NewSetValue[PlanResult](), + readys: make(map[StreamID]Stream), } - for _ = range plan.Ops { + for range plan.Ops { oping := Oping{ State: OpPending, } @@ -134,10 +135,10 @@ func (s *Switch) SetupPlan(plan Plan) error { func (s *Switch) ExecutePlan(id PlanID) (PlanResult, error) { s.lock.Lock() - defer s.lock.Unlock() planning, ok := s.plannings[id] if !ok { + s.lock.Unlock() return PlanResult{}, fmt.Errorf("plan not found") } @@ -162,6 +163,7 @@ func (s *Switch) ExecutePlan(id PlanID) (PlanResult, error) { } }() } + s.lock.Unlock() return planning.callback.WaitValue(context.TODO()) } @@ -239,12 +241,11 @@ func (s *Switch) StreamReady(planID PlanID, stream Stream) { } func (s *Switch) WaitStreams(planID PlanID, streamIDs ...StreamID) ([]Stream, error) { - s.lock.Lock() - defer s.lock.Unlock() plan, ok := s.plannings[planID] if !ok { + s.lock.Unlock() return nil, ErrPlanNotFound } @@ -262,6 +263,7 @@ func (s *Switch) WaitStreams(planID PlanID, streamIDs ...StreamID) ([]Stream, er } if allReady { + s.lock.Unlock() return readys, nil } @@ -272,6 +274,7 @@ func (s *Switch) WaitStreams(planID PlanID, streamIDs ...StreamID) ([]Stream, er Readys: readys, Callback: callback, }) + s.lock.Unlock() return callback.WaitValue(context.TODO()) }