package exec import ( "context" "fmt" "io" "sync" "github.com/hashicorp/go-multierror" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/math2" ) type Driver struct { planID PlanID planBlder *PlanBuilder callback *future.SetValueFuture[PlanResult] ctx *ExecContext cancel context.CancelFunc driverExec *Executor } // 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。 func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) { e.driverExec.PutVar(handle.ID, &StreamValue{Stream: io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)}) } // 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理 func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) { e.driverExec.PutVar(handle.ID, &StreamValue{Stream: str}) } func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) { str, err := BindVar[*StreamValue](e.driverExec, e.ctx.Context, handle.ID) if err != nil { return nil, fmt.Errorf("bind vars: %w", err) } return str.Stream, nil } func (e *Driver) Signal(signal *DriverSignalVar) { e.driverExec.PutVar(signal.ID, &SignalValue{}) } func (e *Driver) Wait(ctx context.Context) (PlanResult, error) { ret, err := e.callback.Wait(ctx) if err != nil { return PlanResult{}, err } return ret, nil } func (e *Driver) execute() { wg := sync.WaitGroup{} retLock := sync.Mutex{} var execErr error stored := make(map[string][]VarValue) for _, p := range e.planBlder.WorkerPlans { wg.Add(1) go func(p *WorkerPlanBuilder, ctx context.Context, cancel context.CancelFunc) { defer wg.Done() plan := Plan{ ID: e.planID, Ops: p.Ops, } cli, err := p.Worker.NewClient() if err != nil { retLock.Lock() execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err)) retLock.Unlock() cancel() return } defer cli.Close() ret, err := cli.ExecutePlan(ctx, plan) if err != nil { retLock.Lock() execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err)) retLock.Unlock() cancel() return } retLock.Lock() for k, v := range ret.Stored { stored[k] = append(stored[k], v...) } retLock.Unlock() }(p, e.ctx.Context, e.cancel) } ret, err := e.driverExec.Run(e.ctx) if err != nil { retLock.Lock() execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err)) retLock.Unlock() e.cancel() } wg.Wait() for k, v := range ret.Stored { stored[k] = append(stored[k], v...) } e.callback.SetComplete(PlanResult{ Stored: stored, }, execErr) } type DriverWriteStream struct { ID VarID RangeHint *math2.Range } type DriverReadStream struct { ID VarID } type DriverSignalVar struct { ID VarID Signal SignalValue } type PlanResult struct { Stored map[string][]VarValue } func (r *PlanResult) Get(key string) VarValue { v, ok := r.Stored[key] if !ok || len(v) == 0 { return nil } return v[0] } func (r *PlanResult) GetArray(key string) []VarValue { v, ok := r.Stored[key] if !ok { return nil } return v }