diff --git a/agent/internal/http/hub_io.go b/agent/internal/http/hub_io.go index 9fabf18..36e4e5d 100644 --- a/agent/internal/http/hub_io.go +++ b/agent/internal/http/hub_io.go @@ -10,9 +10,12 @@ import ( "github.com/gin-gonic/gin" "github.com/inhies/go-bytesize" "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -57,88 +60,79 @@ func (s *IOService) GetStream(ctx *gin.Context) { } defer strVal.Stream.Close() - ctx.Header("Content-Type", "application/octet-stream") + ctx.Header("Content-Type", http2.ContentTypeOctetStream) startTime := time.Now() - n, err := cdsapi.WriteStream(ctx.Writer, strVal.Stream) + + cw := http2.NewChunkedWriter(io2.NopWriteCloser(ctx.Writer)) + n, err := cw.WriteStreamPart("stream", strVal.Stream) + dt := time.Since(startTime) + log.Debugf("size: %v, time: %v, speed: %v/s", n, dt, bytesize.New(float64(n)/dt.Seconds())) + if err != nil { - log.Warnf("sending stream: %v", err) + log.Warnf("writing stream part: %v", err) + cw.Abort(err.Error()) return } - dt := time.Since(startTime) - log.Debugf("send stream completed, size: %v, time: %v, speed: %v/s", n, dt, bytesize.New(float64(n)/dt.Seconds())) + err = cw.Finish() + if err != nil { + log.Warnf("finishing chunked writer: %v", err) + return + } } func (s *IOService) SendStream(ctx *gin.Context) { - ctx.JSON(http.StatusBadRequest, Failed(errorcode.OperationFailed, "not implemented")) - return + cr := http2.NewChunkedReader(ctx.Request.Body) + _, infoData, err := cr.NextDataPart() + if err != nil { + logger.Warnf("reading info data: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.OperationFailed, fmt.Sprintf("reading info data: %v", err))) + return + } - // var req cdsapi.SendStreamReq - // if err := ctx.ShouldBindJSON(&req); err != nil { - // logger.Warnf("binding body: %s", err.Error()) - // ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) - // return - // } + info, err := serder.JSONToObjectEx[cdsapi.SendStreamInfo](infoData) + if err != nil { + logger.Warnf("deserializing info data: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, fmt.Sprintf("deserializing info data: %v", err))) + return + } - // logger. - // WithField("PlanID", req.PlanID). - // WithField("VarID", req.VarID). - // Debugf("stream input") + _, stream, err := cr.NextPart() + if err != nil { + logger.Warnf("reading stream data: %s", err.Error()) + ctx.JSON(http.StatusBadRequest, Failed(errorcode.OperationFailed, fmt.Sprintf("reading stream data: %v", err))) + return + } + defer cr.Close() // 超时设置 - // c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) - // defer cancel() - - // sw := s.svc.swWorker.FindByIDContexted(c, req.PlanID) - // if sw == nil { - // ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) - // return - // } - - // pr, pw := io.Pipe() - // defer pr.Close() - - // streamVar := &exec.StreamVar{ - // ID: req.VarID, - // Stream: pr, - // } - // sw.PutVar(streamVar) - - // var recvSize int64 - - // go func() { - // defer pw.Close() - // _, err := io.Copy(pw, ctx.Request.Body) - // if err != nil { - // logger.Warnf("write data to file failed, err: %s", err.Error()) - // pw.CloseWithError(fmt.Errorf("write data to file failed: %w", err)) - // } - // }() - - // for { - // buf := make([]byte, 1024*64) - // n, err := pr.Read(buf) - // if err != nil { - // if err == io.EOF { - // logger.WithField("ReceiveSize", recvSize). - // WithField("VarID", req.VarID). - // Debugf("file transmission completed") - - // // 将结果返回给客户端 - // ctx.JSON(http.StatusOK, gin.H{"message": "file transmission completed"}) - // return - // } - // logger.Warnf("read stream failed, err: %s", err.Error()) - // ctx.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("read stream failed: %v", err)}) - // return - // } - - // if n > 0 { - // recvSize += int64(n) - // // 处理接收到的数据,例如写入文件或进行其他操作 - // } - // } + c, cancel := context.WithTimeout(ctx.Request.Context(), time.Second*30) + defer cancel() + + sw := s.svc.swWorker.FindByIDContexted(c, info.PlanID) + if sw == nil { + ctx.JSON(http.StatusNotFound, gin.H{"error": "plan not found"}) + return + } + + fut := future.NewSetVoid() + sw.PutVar(info.VarID, &exec.StreamValue{ + Stream: io2.DelegateReadCloser(stream, func() error { + fut.SetVoid() + return nil + }), + }) + + // 等待流发送完毕才能发送响应 + err = fut.Wait(ctx.Request.Context()) + if err != nil { + logger.Warnf("sending stream: %s", err.Error()) + ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("sending stream: %v", err))) + return + } + + ctx.JSON(http.StatusOK, OK(nil)) } func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { diff --git a/common/pkgs/ioswitch2/ops2/ec.go b/common/pkgs/ioswitch2/ops2/ec.go index 8f983d0..2235683 100644 --- a/common/pkgs/ioswitch2/ops2/ec.go +++ b/common/pkgs/ioswitch2/ops2/ec.go @@ -16,99 +16,9 @@ import ( ) func init() { - // exec.UseOp[*ECReconstructAny]() - // exec.UseOp[*ECReconstruct]() exec.UseOp[*ECMultiply]() } -/* - type ECReconstructAny struct { - EC cdssdk.ECRedundancy `json:"ec"` - Inputs []exec.VarID `json:"inputs"` - Outputs []exec.VarID `json:"outputs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` - OutputBlockIndexes []int `json:"outputBlockIndexes"` - } - - func (o *ECReconstructAny) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) - if err != nil { - return fmt.Errorf("new ec: %w", err) - } - - err = exec.BindArrayVars(e, ctx.Context, inputs) - if err != nil { - return err - } - defer func() { - for _, s := range o.Inputs { - s.Stream.Close() - } - }() - - var inputs []io.Reader - for _, s := range o.Inputs { - inputs = append(inputs, s.Stream) - } - - outputs := rs.ReconstructAny(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) - - sem := semaphore.NewWeighted(int64(len(o.Outputs))) - for i := range o.Outputs { - sem.Acquire(ctx.Context, 1) - - o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { - sem.Release(1) - }) - } - e.PutVar(o.Outputs) - - return sem.Acquire(ctx.Context, int64(len(o.Outputs))) - } - - type ECReconstruct struct { - EC cdssdk.ECRedundancy `json:"ec"` - Inputs []exec.VarID `json:"inputs"` - Outputs []exec.VarID `json:"outputs"` - InputBlockIndexes []int `json:"inputBlockIndexes"` - } - - func (o *ECReconstruct) Execute(ctx context.Context, e *exec.Executor) error { - rs, err := ec.NewStreamRs(o.EC.K, o.EC.N, o.EC.ChunkSize) - if err != nil { - return fmt.Errorf("new ec: %w", err) - } - - err = exec.BindArrayVars(e, ctx, o.Inputs) - if err != nil { - return err - } - defer func() { - for _, s := range o.Inputs { - s.Stream.Close() - } - }() - - var inputs []io.Reader - for _, s := range o.Inputs { - inputs = append(inputs, s.Stream) - } - - outputs := rs.ReconstructData(inputs, o.InputBlockIndexes) - - sem := semaphore.NewWeighted(int64(len(o.Outputs))) - for i := range o.Outputs { - sem.Acquire(ctx, 1) - - o.Outputs[i].Stream = io2.AfterReadClosedOnce(outputs[i], func(closer io.ReadCloser) { - sem.Release(1) - }) - } - e.PutVar(o.Outputs) - - return sem.Acquire(ctx, int64(len(o.Outputs))) - } -*/ type ECMultiply struct { Coef [][]byte `json:"coef"` Inputs []exec.VarID `json:"inputs"`