diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index f699534..f8c04a9 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -14,7 +14,6 @@ type Config struct { ID int64 `json:"id"` Local stgmodels.LocalMachineInfo `json:"local"` GRPC *grpc.Config `json:"grpc"` - ECPacketSize int64 `json:"ecPacketSize"` TempFileLifetime int `json:"tempFileLifetime"` // temp状态的副本最多能保持多久时间,单位:秒 Logger log.Config `json:"logger"` RabbitMQ stgmq.Config `json:"rabbitMQ"` diff --git a/agent/internal/services/grpc/io.go b/agent/internal/services/grpc/io.go new file mode 100644 index 0000000..8d016a7 --- /dev/null +++ b/agent/internal/services/grpc/io.go @@ -0,0 +1,136 @@ +package grpc + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + myio "gitlink.org.cn/cloudream/common/utils/io" + agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { + msg, err := server.Recv() + if err != nil { + return fmt.Errorf("recving stream id packet: %w", err) + } + if msg.Type != agentserver.StreamDataPacketType_SendArgs { + return fmt.Errorf("first packet must be a SendArgs packet") + } + + logger. + WithField("PlanID", msg.PlanID). + WithField("StreamID", msg.StreamID). + Debugf("receive stream from grpc") + + pr, pw := io.Pipe() + + s.sw.StreamReady(ioswitch.PlanID(msg.PlanID), ioswitch.NewStream(ioswitch.StreamID(msg.StreamID), pr)) + + // 然后读取文件数据 + var recvSize int64 + for { + msg, err := server.Recv() + + // 读取客户端数据失败 + // 即使err是io.EOF,只要没有收到客户端包含EOF数据包就被断开了连接,就认为接收失败 + if err != nil { + // 关闭文件写入,不需要返回的hash和error + pw.CloseWithError(io.ErrClosedPipe) + logger.WithField("ReceiveSize", recvSize). + Warnf("recv message failed, err: %s", err.Error()) + return fmt.Errorf("recv message failed, err: %w", err) + } + + err = myio.WriteAll(pw, msg.Data) + if err != nil { + // 关闭文件写入,不需要返回的hash和error + pw.CloseWithError(io.ErrClosedPipe) + logger.Warnf("write data to file failed, err: %s", err.Error()) + return fmt.Errorf("write data to file failed, err: %w", err) + } + + recvSize += int64(len(msg.Data)) + + if msg.Type == agentserver.StreamDataPacketType_EOF { + // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash + err := pw.Close() + if err != nil { + logger.Warnf("finish writing failed, err: %s", err.Error()) + return fmt.Errorf("finish writing failed, err: %w", err) + } + + // 并将结果返回到客户端 + err = server.SendAndClose(&agentserver.SendStreamResp{}) + if err != nil { + logger.Warnf("send response failed, err: %s", err.Error()) + return fmt.Errorf("send response failed, err: %w", err) + } + + return nil + } + } +} + +func (s *Service) FetchStream(req *agentserver.FetchStreamReq, server agentserver.Agent_FetchStreamServer) error { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Debugf("send stream by grpc") + + strs, err := s.sw.WaitStreams(ioswitch.PlanID(req.PlanID), ioswitch.StreamID(req.StreamID)) + if err != nil { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Warnf("watting stream: %s", err.Error()) + return fmt.Errorf("watting stream: %w", err) + } + + reader := strs[0].Stream + defer reader.Close() + + buf := make([]byte, 4096) + readAllCnt := 0 + for { + readCnt, err := reader.Read(buf) + + if readCnt > 0 { + readAllCnt += readCnt + err = server.Send(&agentserver.StreamDataPacket{ + Type: agentserver.StreamDataPacketType_Data, + Data: buf[:readCnt], + }) + if err != nil { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Warnf("send stream data failed, err: %s", err.Error()) + return fmt.Errorf("send stream data failed, err: %w", err) + } + } + + // 文件读取完毕 + if err == io.EOF { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Debugf("send data size %d", readAllCnt) + // 发送EOF消息 + server.Send(&agentserver.StreamDataPacket{ + Type: agentserver.StreamDataPacketType_EOF, + }) + return nil + } + + // io.ErrUnexpectedEOF没有读满整个buf就遇到了EOF,此时正常发送剩余数据即可。除了这两个错误之外,其他错误都中断操作 + if err != nil && err != io.ErrUnexpectedEOF { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Warnf("reading stream data: %s", err.Error()) + return fmt.Errorf("reading stream data: %w", err) + } + } +} diff --git a/agent/internal/services/grpc/service.go b/agent/internal/services/grpc/service.go index 1f060d2..2e61eb3 100644 --- a/agent/internal/services/grpc/service.go +++ b/agent/internal/services/grpc/service.go @@ -8,14 +8,18 @@ import ( myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Service struct { agentserver.AgentServer + sw *ioswitch.Switch } -func NewService() *Service { - return &Service{} +func NewService(sw *ioswitch.Switch) *Service { + return &Service{ + sw: sw, + } } func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { @@ -59,7 +63,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro recvSize += int64(len(msg.Data)) - if msg.Type == agentserver.FileDataPacketType_EOF { + if msg.Type == agentserver.StreamDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash hash, err := writer.Finish() if err != nil { @@ -76,6 +80,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro return fmt.Errorf("send response failed, err: %w", err) } + log.Debugf("%d bytes received ", recvSize) return nil } } @@ -106,7 +111,7 @@ func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserve if readCnt > 0 { readAllCnt += readCnt err = server.Send(&agentserver.FileDataPacket{ - Type: agentserver.FileDataPacketType_Data, + Type: agentserver.StreamDataPacketType_Data, Data: buf[:readCnt], }) if err != nil { @@ -121,7 +126,7 @@ func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserve log.WithField("FileHash", req.FileHash).Debugf("send data size %d", readAllCnt) // 发送EOF消息 server.Send(&agentserver.FileDataPacket{ - Type: agentserver.FileDataPacketType_EOF, + Type: agentserver.StreamDataPacketType_EOF, }) return nil } diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index 9441b2b..0a36ff5 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -140,7 +140,7 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos)) } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" if tsk.Error() != nil { diff --git a/agent/internal/services/mq/io.go b/agent/internal/services/mq/io.go new file mode 100644 index 0000000..86e8e74 --- /dev/null +++ b/agent/internal/services/mq/io.go @@ -0,0 +1,65 @@ +package mq + +import ( + "time" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" +) + +func (svc *Service) SetupIOPlan(msg *agtmq.SetupIOPlan) (*agtmq.SetupIOPlanResp, *mq.CodeMessage) { + err := svc.sw.SetupPlan(msg.Plan) + if err != nil { + logger.WithField("PlanID", msg.Plan.ID).Warnf("adding plan: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "adding plan failed") + } + + return mq.ReplyOK(agtmq.NewSetupIOPlanResp()) +} + +func (svc *Service) StartIOPlan(msg *agtmq.StartIOPlan) (*agtmq.StartIOPlanResp, *mq.CodeMessage) { + tsk := svc.taskManager.StartNew(mytask.NewExecuteIOPlan(msg.PlanID)) + return mq.ReplyOK(agtmq.NewStartIOPlanResp(tsk.ID())) +} + +func (svc *Service) WaitIOPlan(msg *agtmq.WaitIOPlan) (*agtmq.WaitIOPlanResp, *mq.CodeMessage) { + tsk := svc.taskManager.FindByID(msg.TaskID) + if tsk == nil { + return nil, mq.Failed(errorcode.TaskNotFound, "task not found") + } + + if msg.WaitTimeoutMs == 0 { + tsk.Wait() + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + planTsk := tsk.Body().(*mytask.ExecuteIOPlan) + return mq.ReplyOK(agtmq.NewWaitIOPlanResp(true, errMsg, planTsk.Result)) + + } else { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + planTsk := tsk.Body().(*mytask.ExecuteIOPlan) + return mq.ReplyOK(agtmq.NewWaitIOPlanResp(true, errMsg, planTsk.Result)) + } + + return mq.ReplyOK(agtmq.NewWaitIOPlanResp(false, "", ioswitch.PlanResult{})) + } +} + +func (svc *Service) CancelIOPlan(msg *agtmq.CancelIOPlan) (*agtmq.CancelIOPlanResp, *mq.CodeMessage) { + svc.sw.CancelPlan(msg.PlanID) + return mq.ReplyOK(agtmq.NewCancelIOPlanResp()) +} diff --git a/agent/internal/services/mq/object.go b/agent/internal/services/mq/object.go index 5331b84..e0ef5e6 100644 --- a/agent/internal/services/mq/object.go +++ b/agent/internal/services/mq/object.go @@ -43,7 +43,7 @@ func (svc *Service) WaitPinningObject(msg *agtmq.WaitPinningObject) (*agtmq.Wait return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(true, errMsg)) } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" if tsk.Error() != nil { diff --git a/agent/internal/services/mq/service.go b/agent/internal/services/mq/service.go index 18c3e56..55789d4 100644 --- a/agent/internal/services/mq/service.go +++ b/agent/internal/services/mq/service.go @@ -2,14 +2,17 @@ package mq import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Service struct { taskManager *task.Manager + sw *ioswitch.Switch } -func NewService(taskMgr *task.Manager) *Service { +func NewService(taskMgr *task.Manager, sw *ioswitch.Switch) *Service { return &Service{ taskManager: taskMgr, + sw: sw, } } diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index 1941106..941a5e2 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -69,7 +69,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" if tsk.Error() != nil { @@ -227,7 +227,7 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage if msg.WaitTimeoutMs == 0 { tsk.Wait() - } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) } diff --git a/agent/internal/task/execute_io_plan.go b/agent/internal/task/execute_io_plan.go new file mode 100644 index 0000000..06f3ed7 --- /dev/null +++ b/agent/internal/task/execute_io_plan.go @@ -0,0 +1,45 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +// TODO 临时使用Task来等待Plan执行进度 +type ExecuteIOPlan struct { + PlanID ioswitch.PlanID + Result ioswitch.PlanResult +} + +func NewExecuteIOPlan(planID ioswitch.PlanID) *ExecuteIOPlan { + return &ExecuteIOPlan{ + PlanID: planID, + } +} + +func (t *ExecuteIOPlan) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + log := logger.WithType[ExecuteIOPlan]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t)) + defer log.Debugf("end") + + ret, err := ctx.sw.ExecutePlan(t.PlanID) + if err != nil { + err := fmt.Errorf("executing io plan: %w", err) + log.WithField("PlanID", t.PlanID).Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + t.Result = ret + + complete(nil, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 7f0a27a..f41112c 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -3,10 +3,12 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type TaskContext struct { distlock *distlock.Service + sw *ioswitch.Switch } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -21,8 +23,9 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service) Manager { +func NewManager(distlock *distlock.Service, sw *ioswitch.Switch) Manager { return task.NewManager(TaskContext{ distlock: distlock, + sw: sw, }) } diff --git a/agent/main.go b/agent/main.go index 29ba784..28e1a8c 100644 --- a/agent/main.go +++ b/agent/main.go @@ -13,6 +13,10 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + + // TODO 注册OpUnion,但在mq包中注册会造成循环依赖,所以只能放到这里 + _ "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" "google.golang.org/grpc" @@ -55,15 +59,17 @@ func main() { log.Fatalf("new ipfs failed, err: %s", err.Error()) } + sw := ioswitch.NewSwitch() + //处置协调端、客户端命令(可多建几个) wg := sync.WaitGroup{} wg.Add(5) - taskMgr := task.NewManager(distlock) + taskMgr := task.NewManager(distlock, &sw) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, &sw), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -83,7 +89,7 @@ func main() { } s := grpc.NewServer() - agtrpc.RegisterAgentServer(s, grpcsvc.NewService()) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw)) go serveGRPC(s, lis, &wg) go serveDistLock(distlock) diff --git a/client/internal/cmdline/package.go b/client/internal/cmdline/package.go index 46aa0f6..a30d3e3 100644 --- a/client/internal/cmdline/package.go +++ b/client/internal/cmdline/package.go @@ -9,7 +9,6 @@ import ( "github.com/jedib0t/go-pretty/v6/table" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/client/internal/config" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" ) @@ -186,7 +185,9 @@ func PackageUpdateRepPackage(ctx CommandContext, packageID int64, rootPath strin } } -func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, nodeAffinity []int64) error { +func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, name string, ecName string, chunkSize int64, nodeAffinity []int64) error { + rootPath = filepath.Clean(rootPath) + var uploadFilePathes []string err := filepath.WalkDir(rootPath, func(fname string, fi os.DirEntry, err error) error { if err != nil { @@ -209,14 +210,14 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, } objIter := iterator.NewUploadingObjectIterator(rootPath, uploadFilePathes) - taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, config.Cfg().ECPacketSize), nodeAff) + taskID, err := ctx.Cmdline.Svc.PackageSvc().StartCreatingECPackage(0, bucketID, name, objIter, cdssdk.NewECRedundancyInfo(ecName, chunkSize), nodeAff) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } for { - complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingRepPackage(taskID, time.Second*5) + complete, uploadObjectResult, err := ctx.Cmdline.Svc.PackageSvc().WaitCreatingECPackage(taskID, time.Second*5) if complete { if err != nil { return fmt.Errorf("uploading ec package: %w", err) @@ -224,12 +225,11 @@ func PackageUploadECPackage(ctx CommandContext, rootPath string, bucketID int64, tb := table.NewWriter() - tb.AppendHeader(table.Row{"Path", "ObjectID", "FileHash"}) + tb.AppendHeader(table.Row{"Path", "ObjectID"}) for i := 0; i < len(uploadObjectResult.ObjectResults); i++ { tb.AppendRow(table.Row{ uploadObjectResult.ObjectResults[i].Info.Path, uploadObjectResult.ObjectResults[i].ObjectID, - uploadObjectResult.ObjectResults[i].FileHash, }) } fmt.Print(tb.Render()) @@ -319,9 +319,9 @@ func init() { commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "rep") - commands.MustAdd(PackageUploadRepPackage, "pkg", "new", "ec") + commands.MustAdd(PackageUploadECPackage, "pkg", "new", "ec") - commands.MustAdd(PackageUpdateRepPackage, "pkg", "update", "ec") + commands.MustAdd(PackageUpdateECPackage, "pkg", "update", "ec") commands.MustAdd(PackageDeletePackage, "pkg", "delete") diff --git a/client/internal/config/config.go b/client/internal/config/config.go index b8aa80d..107f7c1 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -11,14 +11,13 @@ import ( ) type Config struct { - Local stgmodels.LocalMachineInfo `json:"local"` - AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` - ECPacketSize int64 `json:"ecPacketSize"` - MaxRepCount int `json:"maxRepCount"` - Logger logger.Config `json:"logger"` - RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon - DistLock distlock.Config `json:"distlock"` + Local stgmodels.LocalMachineInfo `json:"local"` + AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` + MaxRepCount int `json:"maxRepCount"` + Logger logger.Config `json:"logger"` + RabbitMQ stgmq.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + DistLock distlock.Config `json:"distlock"` } var cfg Config diff --git a/client/internal/services/package.go b/client/internal/services/package.go index b02a771..2677252 100644 --- a/client/internal/services/package.go +++ b/client/internal/services/package.go @@ -172,10 +172,10 @@ func (svc *PackageService) StartCreatingECPackage(userID int64, bucketID int64, return tsk.ID(), nil } -func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateRepPackageResult, error) { +func (svc *PackageService) WaitCreatingECPackage(taskID string, waitTimeout time.Duration) (bool, *agtcmd.CreateECPackageResult, error) { tsk := svc.TaskMgr.FindByID(taskID) if tsk.WaitTimeout(waitTimeout) { - cteatePkgTask := tsk.Body().(*mytask.CreateRepPackage) + cteatePkgTask := tsk.Body().(*mytask.CreateECPackage) return true, cteatePkgTask.Result, tsk.Error() } return false, nil, nil diff --git a/common/assets/confs/agent.config.json b/common/assets/confs/agent.config.json index 7e1db00..b340adf 100644 --- a/common/assets/confs/agent.config.json +++ b/common/assets/confs/agent.config.json @@ -9,7 +9,6 @@ "ip": "127.0.0.1", "port": 5010 }, - "ecPacketSize": 10, "tempFileLifetime": 3600, "logger": { "output": "file", diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index cdc5dd2..81a9ea6 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -6,7 +6,6 @@ "agentGRPC": { "port": 5010 }, - "ecPacketSize": 10, "maxRepCount": 10, "logger": { "output": "stdout", diff --git a/common/assets/scripts/create_database.sql b/common/assets/scripts/create_database.sql index b2be415..2fdcdfe 100644 --- a/common/assets/scripts/create_database.sql +++ b/common/assets/scripts/create_database.sql @@ -17,16 +17,33 @@ create table Node ( ) comment = '节点表'; insert into - Node (NodeID, Name, LocalIP, ExternalIP, LocalGRPCPort, ExternalGRPCPort, LocationID, State) + Node ( + NodeID, + Name, + LocalIP, + ExternalIP, + LocalGRPCPort, + ExternalGRPCPort, + LocationID, + State + ) values - (1, "localhost", "localhost", "localhost", 5010, 5010, 1, "alive") -create table Storage ( - StorageID int not null auto_increment primary key comment '存储服务ID', - Name varchar(100) not null comment '存储服务名称', - NodeID int not null comment '存储服务所在节点的ID', - Directory varchar(4096) not null comment '存储服务所在节点的目录', - State varchar(100) comment '状态' -) comment = "存储服务表"; + ( + 1, + "localhost", + "localhost", + "localhost", + 5010, + 5010, + 1, + "alive" + ) create table Storage ( + StorageID int not null auto_increment primary key comment '存储服务ID', + Name varchar(100) not null comment '存储服务名称', + NodeID int not null comment '存储服务所在节点的ID', + Directory varchar(4096) not null comment '存储服务所在节点的目录', + State varchar(100) comment '状态' + ) comment = "存储服务表"; insert into Storage (StorageID, Name, NodeID, Directory, State) @@ -145,7 +162,7 @@ values (1, "Local"); create table Ec ( - EcID int not null comment '纠删码ID', + EcID int not null primary key comment '纠删码ID', Name varchar(128) not null comment '纠删码名称', EcK int not null comment 'ecK', EcN int not null comment 'ecN' diff --git a/common/models/models.go b/common/models/models.go index 5ba1a08..6296953 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -4,39 +4,22 @@ import "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" /// TODO 将分散在各处的公共结构体定义集中到这里来 -type RedundancyData interface{} -type RedundancyDataConst interface { - RepRedundancyData | ECRedundancyData | RedundancyData -} -type RepRedundancyData struct { - FileHash string `json:"fileHash"` -} - -func NewRedundancyRepData(fileHash string) RepRedundancyData { - return RepRedundancyData{ - FileHash: fileHash, - } -} - -type ECRedundancyData struct { - Ec EC `json:"ec"` - Blocks []ObjectBlockData `json:"blocks"` +type EC struct { + ID int64 `json:"id"` + K int `json:"k"` + N int `json:"n"` + ChunkSize int64 `json:"chunkSize"` } -func NewRedundancyEcData(ec EC, blocks []ObjectBlockData) ECRedundancyData { - return ECRedundancyData{ - Ec: ec, - Blocks: blocks, +func NewEc(id int64, k int, n int, chunkSize int64) EC { + return EC{ + ID: id, + K: k, + N: n, + ChunkSize: chunkSize, } } -type EC struct { - ID int `json:"id"` - Name string `json:"name"` - EcK int `json:"ecK"` - EcN int `json:"ecN"` -} - type ObjectBlockData struct { Index int `json:"index"` FileHash string `json:"fileHash"` @@ -51,15 +34,6 @@ func NewObjectBlockData(index int, fileHash string, nodeIDs []int64) ObjectBlock } } -func NewEc(id int, name string, ecK int, ecN int) EC { - return EC{ - ID: id, - Name: name, - EcK: ecK, - EcN: ecN, - } -} - type ObjectRepData struct { Object model.Object `json:"object"` FileHash string `json:"fileHash"` diff --git a/common/pkgs/cmd/create_ec_package.go b/common/pkgs/cmd/create_ec_package.go index 1e7743d..fd77010 100644 --- a/common/pkgs/cmd/create_ec_package.go +++ b/common/pkgs/cmd/create_ec_package.go @@ -4,14 +4,13 @@ import ( "fmt" "io" "math/rand" - "os" - "path/filepath" "sync" "github.com/samber/lo" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder" @@ -185,175 +184,65 @@ func uploadAndUpdateECPackage(packageID int64, objectIter iterator.UploadingObje } // 上传文件 -func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ec model.Ec) ([]string, []int64, error) { - //生成纠删码的写入节点序列 - nodes := make([]UploadNodeInfo, ec.EcN) - numNodes := len(uploadNodes) - startWriteNodeID := rand.Intn(numNodes) - for i := 0; i < ec.EcN; i++ { - nodes[i] = uploadNodes[(startWriteNodeID+i)%numNodes] - } +func uploadECObject(obj *iterator.IterUploadingObject, uploadNodes []UploadNodeInfo, ecInfo cdssdk.ECRedundancyInfo, ecMod model.Ec) ([]string, []int64, error) { + uploadNodes = shuffleNodes(uploadNodes, ecMod.EcN) - hashs, err := ecWrite(obj.File, obj.Size, ecInfo.PacketSize, ec.EcK, ec.EcN, nodes) + rs, err := ec.NewRs(ecMod.EcK, ecMod.EcN, ecInfo.ChunkSize) if err != nil { - return nil, nil, fmt.Errorf("EcWrite failed, err: %w", err) + return nil, nil, err } - nodeIDs := make([]int64, len(nodes)) - for i := 0; i < len(nodes); i++ { - nodeIDs[i] = nodes[i].Node.NodeID + outputs := myio.ChunkedSplit(obj.File, ecInfo.ChunkSize, ecMod.EcK, myio.ChunkedSplitOption{ + FillZeros: true, + }) + var readers []io.Reader + for _, o := range outputs { + readers = append(readers, o) } + defer func() { + for _, o := range outputs { + o.Close() + } + }() - return hashs, nodeIDs, nil -} + encStrs := rs.EncodeAll(readers) -// chooseUploadNode 选择一个上传文件的节点 -// 1. 从与当前客户端相同地域的节点中随机选一个 -// 2. 没有用的话从所有节点中随机选一个 -func (t *CreateECPackage) chooseUploadNode(nodes []UploadNodeInfo) UploadNodeInfo { - sameLocationNodes := lo.Filter(nodes, func(e UploadNodeInfo, i int) bool { return e.IsSameLocation }) - if len(sameLocationNodes) > 0 { - return sameLocationNodes[rand.Intn(len(sameLocationNodes))] - } + wg := sync.WaitGroup{} - return nodes[rand.Intn(len(nodes))] -} + nodeIDs := make([]int64, ecMod.EcN) + fileHashes := make([]string, ecMod.EcN) + anyErrs := make([]error, ecMod.EcN) -func ecWrite(file io.ReadCloser, fileSize int64, packetSize int64, ecK int, ecN int, nodes []UploadNodeInfo) ([]string, error) { - // TODO 需要参考RepWrite函数的代码逻辑,做好错误处理 - //获取文件大小 - - var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN - //计算每个块的packet数 - numPacket := (fileSize + int64(ecK)*packetSize - 1) / (int64(ecK) * packetSize) - //fmt.Println(numPacket) - //创建channel - loadBufs := make([]chan []byte, ecN) - encodeBufs := make([]chan []byte, ecN) - for i := 0; i < ecN; i++ { - loadBufs[i] = make(chan []byte) - } - for i := 0; i < ecN; i++ { - encodeBufs[i] = make(chan []byte) - } - hashs := make([]string, ecN) - //正式开始写入 - go load(file, loadBufs[:ecN], ecK, numPacket*int64(ecK), packetSize) //从本地文件系统加载数据 - go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) - - var wg sync.WaitGroup - wg.Add(ecN) - - for idx := 0; idx < ecN; idx++ { - i := idx - reader := channelBytesReader{ - channel: encodeBufs[idx], - packetCount: numPacket, - } + for i := range encStrs { + idx := i + wg.Add(1) + nodeIDs[idx] = uploadNodes[idx].Node.NodeID go func() { - // TODO 处理错误 - fileHash, _ := uploadFile(&reader, nodes[i]) - hashs[i] = fileHash - wg.Done() + defer wg.Done() + fileHashes[idx], anyErrs[idx] = uploadFile(encStrs[idx], uploadNodes[idx]) }() } - wg.Wait() - - return hashs, nil -} - -func load(file io.ReadCloser, loadBufs []chan []byte, ecK int, totalNumPacket int64, ecPacketSize int64) error { - - for i := 0; int64(i) < totalNumPacket; i++ { - - buf := make([]byte, ecPacketSize) - idx := i % ecK - _, err := file.Read(buf) - if err != nil { - return fmt.Errorf("read file falied, err:%w", err) - } - loadBufs[idx] <- buf - - if idx == ecK-1 { - for j := ecK; j < len(loadBufs); j++ { - zeroPkt := make([]byte, ecPacketSize) - loadBufs[j] <- zeroPkt - } - } - if err != nil && err != io.EOF { - return fmt.Errorf("load file to buf failed, err:%w", err) - } - } - for i := 0; i < len(loadBufs); i++ { - - close(loadBufs[i]) - } - file.Close() - return nil -} + wg.Wait() -func encode(inBufs []chan []byte, outBufs []chan []byte, ecK int, coefs [][]int64, numPacket int64) { - var tmpIn [][]byte - tmpIn = make([][]byte, len(outBufs)) - enc := ec.NewRsEnc(ecK, len(outBufs)) - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(outBufs); j++ { - tmpIn[j] = <-inBufs[j] + for i, e := range anyErrs { + if e != nil { + return nil, nil, fmt.Errorf("uploading file to node %d: %w", uploadNodes[i].Node.NodeID, e) } - enc.Encode(tmpIn) - for j := 0; j < len(outBufs); j++ { - outBufs[j] <- tmpIn[j] - } - } - for i := 0; i < len(outBufs); i++ { - close(outBufs[i]) } -} -type channelBytesReader struct { - channel chan []byte - packetCount int64 - readingData []byte + return fileHashes, nodeIDs, nil } -func (r *channelBytesReader) Read(buf []byte) (int, error) { - if len(r.readingData) == 0 { - if r.packetCount == 0 { - return 0, io.EOF - } - - r.readingData = <-r.channel - r.packetCount-- +func shuffleNodes(uploadNodes []UploadNodeInfo, extendTo int) []UploadNodeInfo { + for i := len(uploadNodes); i < extendTo; i++ { + uploadNodes = append(uploadNodes, uploadNodes[rand.Intn(len(uploadNodes))]) } - len := copy(buf, r.readingData) - r.readingData = r.readingData[:len] - - return len, nil -} + // 随机排列上传节点 + rand.Shuffle(len(uploadNodes), func(i, j int) { + uploadNodes[i], uploadNodes[j] = uploadNodes[j], uploadNodes[i] + }) -func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *sync.WaitGroup) { - fDir, err := os.Executable() - if err != nil { - panic(err) - } - fURL := filepath.Join(filepath.Dir(fDir), "assets") - _, err = os.Stat(fURL) - if os.IsNotExist(err) { - os.MkdirAll(fURL, os.ModePerm) - } - file, err := os.Create(filepath.Join(fURL, localFilePath)) - if err != nil { - return - } - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(inBuf); j++ { - tmp := <-inBuf[j] - fmt.Println(tmp) - file.Write(tmp) - } - } - file.Close() - wg.Done() + return uploadNodes } diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index d35bedf..55ee3e5 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -19,7 +19,7 @@ func (db *DB) ObjectBlock() *ObjectBlockDB { } func (db *ObjectBlockDB) Create(ctx SQLContext, objectID int64, index int, fileHash string) error { - _, err := ctx.Exec("insert into ObjectBlock(ObjectID, Index, FileHash) values(?,?,?)", objectID, index, fileHash) + _, err := ctx.Exec("insert into ObjectBlock values(?,?,?)", objectID, index, fileHash) return err } diff --git a/common/pkgs/ec/rs_test.go b/common/pkgs/ec/rs_test.go index 415cd24..fb60f28 100644 --- a/common/pkgs/ec/rs_test.go +++ b/common/pkgs/ec/rs_test.go @@ -2,220 +2,262 @@ package ec import ( "bytes" - "fmt" "io" - "io/ioutil" + "sync" "testing" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" - //"gitlink.org.cn/cloudream/common/pkgs/ipfs" - //"gitlink.org.cn/cloudream/storage/agent/internal/config" - //stgglb "gitlink.org.cn/cloudream/storage/common/globals" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" + . "github.com/smartystreets/goconvey/convey" ) -func test_Encode(t *testing.T) { - enc, _ := NewRs(3, 5, 10) - rc := make([]io.ReadCloser, 3) - rc[0] = ioutil.NopCloser(bytes.NewBufferString("11111111")) - rc[1] = ioutil.NopCloser(bytes.NewBufferString("22222222")) - rc[2] = ioutil.NopCloser(bytes.NewBufferString("33333333")) - /*rc[0].Close() - rc[1].Close() - rc[2].Close()*/ - print("#$$$$$$$$$$$") - out, _ := enc.ReconstructData(rc, []int{0, 1, 2}) - //out, _ := enc.Encode(rc) - buf := make([]byte, 100) - out[0].Read(buf) - fmt.Println(buf) - out[1].Read(buf) - fmt.Println(buf) - t.Logf(string(buf)) - t.Log(buf) -} +func Test_EncodeReconstruct(t *testing.T) { + Convey("编码后使用校验块重建数据", t, func() { + rs, err := NewRs(2, 3, 5) + So(err, ShouldBeNil) -/* ------------------------------------------------- -hash:QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6 -内容:1111122222233333333334444444445663454543534534 - -hash:QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW -(5,3),chunkSize:6 -data1:QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56 -data2:QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P -data3:QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn -内容:qqqqqqqqwwwwwwwwwwwwwweeeeeeeeeeeeerrrrrrrrrrr ------------------------------------------------------ -*/ -func test_Fetch(t *testing.T) { - - blkReader, _ := NewBlockReader() - /*****************************FetchBlock*************************/ - /*r, _ := blkReader.FetchBLock("QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6") - data, _ := ioutil.ReadAll(r) - t.Logf(string(data))*/ - - /**********************FetchBlocks************************************ - hashs := []string{"QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", "QmX49sGugmtVPfNo13q84YL1NwGmr5yzWDDmJZ7PniQ9b6"} - rs, _ := blkReader.FetchBLocks(hashs) - data1, _ := ioutil.ReadAll(rs[0]) - data2, _ := ioutil.ReadAll(rs[1]) - t.Logf(string(data1)) - t.Logf(string(data2)) - /*************************JumpFetchBlock*********************************/ - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(6) - r, _ := blkReader.JumpFetchBlock(1) - data, _ := ioutil.ReadAll(r) - t.Logf(string(data)) -} -func test_Fetch_and_Encode(t *testing.T) { - chunkSize := int64(6) - blkReader, _ := NewBlockReader() - defer blkReader.Close() - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(int64(chunkSize)) - dataBlocks := make([]io.ReadCloser, 3) - for i := range dataBlocks { - dataBlocks[i], _ = blkReader.JumpFetchBlock(i) - } - enc, _ := NewRs(3, 5, chunkSize) - parityBlocks, _ := enc.Encode(dataBlocks) - - parityData := make([]string, 2) - finished := false - for { - if finished { - break + outputs := rs.EncodeAll([]io.Reader{ + bytes.NewReader([]byte{1, 2, 3, 4, 5}), + bytes.NewReader([]byte{6, 7, 8, 9, 10}), + }) + + var outputData = [][]byte{ + make([]byte, 5), + make([]byte, 5), + make([]byte, 5), } - buf := make([]byte, chunkSize) - for i, pipe := range parityBlocks { - _, err := pipe.Read(buf) - if err != nil { - finished = true - break - } - parityData[i] = parityData[i] + string(buf) + + { // 编码所有块 + errs := make([]error, 3) + + wg := sync.WaitGroup{} + for i := range outputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(outputs[idx], outputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(outputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) + So(outputData[1], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - } - t.Logf(parityData[0]) - t.Logf(parityData[1]) -} + { // 重建所有数据块 + recOutputs := rs.ReconstructData([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}) + + recOutputData := [][]byte{ + make([]byte, 5), + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() -func test_Fetch_and_Encode_and_Degraded(t *testing.T) { - chunkSize := int64(6) - blkReader, _ := NewBlockReader() - defer blkReader.Close() - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(int64(chunkSize)) - dataBlocks := make([]io.ReadCloser, 3) - for i := range dataBlocks { - dataBlocks[i], _ = blkReader.JumpFetchBlock(i) - } - enc, _ := NewRs(3, 5, chunkSize) - parityBlocks, _ := enc.Encode(dataBlocks) - go func() { - ioutil.ReadAll(parityBlocks[0]) - }() - degradedBlocks := make([]io.ReadCloser, 3) - degradedBlocks[0], _ = blkReader.JumpFetchBlock(1) - degradedBlocks[1], _ = blkReader.JumpFetchBlock(2) - degradedBlocks[2] = parityBlocks[1] - newDataBlocks, _ := enc.ReconstructData(degradedBlocks, []int{1, 2, 4}) - newData := make([]string, 3) - finished := false - for { - if finished { - break + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) + So(recOutputData[1], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - buf := make([]byte, chunkSize) - for i, pipe := range newDataBlocks { - _, err := pipe.Read(buf) - if err != nil { - finished = true - break - } - newData[i] = newData[i] + string(buf) + + { // 重建指定的数据块 + recOutputs := rs.ReconstructSome([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}, []int{0, 1}) + + recOutputData := [][]byte{ + make([]byte, 5), + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) + So(recOutputData[1], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - } - t.Logf(newData[0]) - t.Logf(newData[1]) - t.Logf(newData[2]) -} + { // 重建指定的数据块 + recOutputs := rs.ReconstructSome([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}, []int{0}) -func test_pin_data_blocks(t *testing.T) { - chunkSize := int64(6) - blkReader, _ := NewBlockReader() - defer blkReader.Close() - blkReader.SetJumpRead("QmcN1EJm2w9XT62Q9YqA5Ym7YDzjmnqJYc565bzRs5VosW", 46, 3) - blkReader.SetchunkSize(int64(chunkSize)) - dataBlocks := make([]io.ReadCloser, 3) - ipfsclient, _ := stgglb.IPFSPool.Acquire() - for i := range dataBlocks { - dataBlocks[i], _ = blkReader.JumpFetchBlock(i) - hash, _ := ipfsclient.CreateFile(dataBlocks[i]) - t.Logf(hash) - } + recOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) -} + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() -func print_ioreaders(t *testing.T, readers []io.ReadCloser, chunkSize int64) { - newData := make([]string, len(readers)) - finished := false - for { - if finished { - break + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{1, 2, 3, 4, 5}) } - buf := make([]byte, chunkSize) - for i, pipe := range readers { - _, err := pipe.Read(buf) - if err != nil { - finished = true - break - } - newData[i] = newData[i] + string(buf) + + { // 重建指定的数据块 + recOutputs := rs.ReconstructSome([]io.Reader{ + bytes.NewBuffer(outputData[1]), + bytes.NewBuffer(outputData[2]), + }, []int{1, 2}, []int{1}) + + recOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range recOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(recOutputs[idx], recOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(recOutputData[0], ShouldResemble, []byte{6, 7, 8, 9, 10}) } - } - for _, data := range newData { - t.Logf(data) - } -} -func test_reconstructData(t *testing.T) { - blkReader, _ := NewBlockReader() - defer blkReader.Close() - hashs := []string{"QmS2t7xFgTMTX2DGYsbDdmHnGvaG6sc7D9k1R2WZyuDx56", "QmUSZvuABjfGKF1c4VxvVBdH31SroDm2QyLGBrVFomRM8P", "QmcD3RpUh5rwMhf9yBywBeT6ibT1P5DSJC67aoD77jhTBn"} - dataBlocks, _ := blkReader.FetchBLocks(hashs) - chunkSize := int64(6) - enc, _ := NewRs(3, 5, chunkSize) - print("@@@@@@@@@") - newDataBlocks, _ := enc.ReconstructSome(dataBlocks, []int{0, 1, 2}, []int{3, 4}) - print("!!!!!!!!!") - print_ioreaders(t, newDataBlocks, chunkSize) -} -func Test_main(t *testing.T) { - //test_Encode(t) - //stgglb.InitLocal(&config.Cfg().Local) - stgglb.InitIPFSPool(&ipfs.Config{Port: 5001}) - //test_Fetch(t) - //test_Fetch_and_Encode(t) - //test_Fetch_and_Encode_and_Degraded(t) - //test_pin_data_blocks(t) - test_reconstructData(t) -} + { // 单独产生校验块 + encOutputs := rs.Encode([]io.Reader{ + bytes.NewBuffer(outputData[0]), + bytes.NewBuffer(outputData[1]), + }) + + encOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range encOutputs { + idx := i -/* -func Test_Fetch_Encode_ReconstructData(t *testing.T) { - inFileName := "test.txt" - enc, _ := NewRs(3, 5, 10) - file, err := os.Open(inFileName) - if err != nil { - t.Error(err) - } - var data io.ReadCloser - data = file - //enc.Encode(data) -}*/ + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(encOutputs[idx], encOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(encOutputData[0], ShouldResemble, outputData[2]) + } + + { // 使用ReconstructAny单独重建校验块 + encOutputs := rs.ReconstructAny([]io.Reader{ + bytes.NewBuffer(outputData[0]), + bytes.NewBuffer(outputData[1]), + }, []int{0, 1}, []int{2}) + + encOutputData := [][]byte{ + make([]byte, 5), + } + errs := make([]error, 2) + + wg := sync.WaitGroup{} + for i := range encOutputs { + idx := i + + wg.Add(1) + go func() { + defer wg.Done() + _, err := io.ReadFull(encOutputs[idx], encOutputData[idx]) + errs[idx] = err + }() + } + + wg.Wait() + + for _, e := range errs { + if e != io.EOF { + So(e, ShouldBeNil) + } + } + + So(encOutputData[0], ShouldResemble, outputData[2]) + } + }) +} diff --git a/common/pkgs/ec/stream_rs.go b/common/pkgs/ec/stream_rs.go index 02402fb..c1d68e1 100644 --- a/common/pkgs/ec/stream_rs.go +++ b/common/pkgs/ec/stream_rs.go @@ -4,6 +4,7 @@ import ( "io" "github.com/klauspost/reedsolomon" + myio "gitlink.org.cn/cloudream/common/utils/io" ) type Rs struct { @@ -26,192 +27,246 @@ func NewRs(k int, n int, chunkSize int64) (*Rs, error) { return &enc, err } -// 编码 -func (r *Rs) Encode(data []io.ReadCloser) ([]io.ReadCloser, error) { - output := make([]io.ReadCloser, r.ecP) - parity := make([]*io.PipeWriter, r.ecP) - for i := range output { - var reader *io.PipeReader - reader, parity[i] = io.Pipe() - output[i] = reader +// 编码。仅输出校验块 +func (r *Rs) Encode(input []io.Reader) []io.ReadCloser { + outReaders := make([]io.ReadCloser, r.ecP) + outWriters := make([]*io.PipeWriter, r.ecP) + for i := 0; i < r.ecP; i++ { + outReaders[i], outWriters[i] = io.Pipe() } + go func() { chunks := make([][]byte, r.ecN) - for i := range chunks { - chunks[i] = make([]byte, r.chunkSize) + for idx := 0; idx < r.ecN; idx++ { + chunks[idx] = make([]byte, r.chunkSize) } + + var closeErr error + loop: for { - finished := false - //读数据块到buff + //读块到buff for i := 0; i < r.ecK; i++ { - _, err := data[i].Read(chunks[i]) + _, err := io.ReadFull(input[i], chunks[i]) if err != nil { - finished = true - break + closeErr = err + break loop } } - if finished { - break - } - //编码 + err := r.encoder.Encode(chunks) if err != nil { return } - //输出到writer - for i := r.ecK; i < r.ecN; i++ { - parity[i-r.ecK].Write(chunks[i]) + + //输出到outWriter + for i := range outWriters { + err := myio.WriteAll(outWriters[i], chunks[i+r.ecK]) + if err != nil { + closeErr = err + break loop + } } } - for i := range data { - data[i].Close() - } - for i := range parity { - parity[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) } }() - return output, nil + + return outReaders } -// 降级读,任意k个块恢复出原始数据块 -func (r *Rs) ReconstructData(input []io.ReadCloser, inBlockIdx []int) ([]io.ReadCloser, error) { - dataReader := make([]io.ReadCloser, r.ecK) - dataWriter := make([]*io.PipeWriter, r.ecK) - for i := 0; i < r.ecK; i++ { - var reader *io.PipeReader - reader, dataWriter[i] = io.Pipe() - dataReader[i] = reader +// 编码。输出包含所有的数据块和校验块 +func (r *Rs) EncodeAll(input []io.Reader) []io.ReadCloser { + outReaders := make([]io.ReadCloser, r.ecN) + outWriters := make([]*io.PipeWriter, r.ecN) + for i := 0; i < r.ecN; i++ { + outReaders[i], outWriters[i] = io.Pipe() } + go func() { chunks := make([][]byte, r.ecN) - for i := range chunks { - chunks[i] = make([]byte, r.chunkSize) - } - constructIdx := make([]bool, r.ecN) - for i := 0; i < r.ecN; i++ { - constructIdx[i] = false - } - for i := 0; i < r.ecK; i++ { - constructIdx[inBlockIdx[i]] = true - } - nilIdx := make([]int, r.ecP) - ct := 0 - for i := 0; i < r.ecN; i++ { - if !constructIdx[i] { - nilIdx[ct] = i - ct++ - } + for idx := 0; idx < r.ecN; idx++ { + chunks[idx] = make([]byte, r.chunkSize) } + var closeErr error + loop: for { - finished := false - - //读数据块到buff + //读块到buff for i := 0; i < r.ecK; i++ { - _, err := input[i].Read(chunks[inBlockIdx[i]]) + _, err := io.ReadFull(input[i], chunks[i]) if err != nil { - finished = true - break + closeErr = err + break loop } } - for i := 0; i < r.ecP; i++ { - chunks[nilIdx[i]] = nil - } - if finished { - break - } - //解码 - err := r.encoder.ReconstructData(chunks) + + err := r.encoder.Encode(chunks) if err != nil { return } - //输出到writer - for i := 0; i < r.ecK; i++ { - dataWriter[i].Write(chunks[i]) + + //输出到outWriter + for i := range outWriters { + err := myio.WriteAll(outWriters[i], chunks[i]) + if err != nil { + closeErr = err + break loop + } } } - for i := range input { - input[i].Close() - } - for i := range dataWriter { - dataWriter[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) } }() - return dataReader, nil + + return outReaders } -// 修复,任意k个块恢复若干想要的块 -func (r *Rs) ReconstructSome(input []io.ReadCloser, inBlockIdx []int, outBlockIdx []int) ([]io.ReadCloser, error) { - outReader := make([]io.ReadCloser, len(outBlockIdx)) - outWriter := make([]*io.PipeWriter, len(outBlockIdx)) +// 降级读,任意k个块恢复出所有原始的数据块。 +func (r *Rs) ReconstructData(input []io.Reader, inBlockIdx []int) []io.ReadCloser { + outIndexes := make([]int, r.ecK) + for i := 0; i < r.ecK; i++ { + outIndexes[i] = i + } + + return r.ReconstructSome(input, inBlockIdx, outIndexes) +} + +// 修复,任意k个块恢复指定的数据块。 +// 调用者应该保证input的每一个流长度相同,且均为chunkSize的整数倍 +func (r *Rs) ReconstructSome(input []io.Reader, inBlockIdx []int, outBlockIdx []int) []io.ReadCloser { + outReaders := make([]io.ReadCloser, len(outBlockIdx)) + outWriters := make([]*io.PipeWriter, len(outBlockIdx)) for i := 0; i < len(outBlockIdx); i++ { - var reader *io.PipeReader - reader, outWriter[i] = io.Pipe() - outReader[i] = reader + outReaders[i], outWriters[i] = io.Pipe() } + go func() { chunks := make([][]byte, r.ecN) - for i := range chunks { - chunks[i] = make([]byte, r.chunkSize) + // 只初始化输入的buf,输出的buf在调用重建函数之后,会自动建立出来 + for _, idx := range inBlockIdx { + chunks[idx] = make([]byte, r.chunkSize) } - finished := false + //outBools:要输出的若干块idx outBools := make([]bool, r.ecN) - for i := range outBools { - outBools[i] = false - } - for i := range outBlockIdx { - outBools[outBlockIdx[i]] = true - } - constructIdx := make([]bool, r.ecN) - for i := 0; i < r.ecN; i++ { - constructIdx[i] = false - } - for i := 0; i < r.ecK; i++ { - constructIdx[inBlockIdx[i]] = true - } - //nil Idx就是没有输入的块idx,要置成nil - nilIdx := make([]int, r.ecP) - ct := 0 - for i := 0; i < r.ecN; i++ { - if !constructIdx[i] { - nilIdx[ct] = i - ct++ - } + for _, idx := range outBlockIdx { + outBools[idx] = true } + inBools := make([]bool, r.ecN) + for _, idx := range inBlockIdx { + inBools[idx] = true + } + + var closeErr error + loop: for { //读块到buff for i := 0; i < r.ecK; i++ { - _, err := input[i].Read(chunks[inBlockIdx[i]]) + _, err := io.ReadFull(input[i], chunks[inBlockIdx[i]]) if err != nil { - finished = true - break + closeErr = err + break loop } } - for i := 0; i < r.ecP; i++ { - chunks[nilIdx[i]] = nil - } - if finished { - break - } - //解码 err := r.encoder.ReconstructSome(chunks, outBools) if err != nil { return } + //输出到outWriter for i := range outBlockIdx { - outWriter[i].Write(chunks[outBlockIdx[i]]) + err := myio.WriteAll(outWriters[i], chunks[outBlockIdx[i]]) + if err != nil { + closeErr = err + break loop + } + + // 设置buf长度为0,cap不会受影响。注:如果一个块既是输入又是输出,那不能清空这个块 + if !inBools[outBlockIdx[i]] { + chunks[outBlockIdx[i]] = chunks[outBlockIdx[i]][:0] + } } } - for i := range input { - input[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) + } + }() + + return outReaders +} + +// 重建任意块,包括数据块和校验块。 +// 当前的实现会把不需要的块都重建出来,所以应该避免使用这个函数。 +func (r *Rs) ReconstructAny(input []io.Reader, inBlockIdxes []int, outBlockIdxes []int) []io.ReadCloser { + outReaders := make([]io.ReadCloser, len(outBlockIdxes)) + outWriters := make([]*io.PipeWriter, len(outBlockIdxes)) + for i := 0; i < len(outBlockIdxes); i++ { + outReaders[i], outWriters[i] = io.Pipe() + } + + go func() { + chunks := make([][]byte, r.ecN) + // 只初始化输入的buf,输出的buf在调用重建函数之后,会自动建立出来 + for _, idx := range inBlockIdxes { + chunks[idx] = make([]byte, r.chunkSize) + } + + //outBools:要输出的若干块idx + outBools := make([]bool, r.ecN) + for _, idx := range outBlockIdxes { + outBools[idx] = true + } + + inBools := make([]bool, r.ecN) + for _, idx := range inBlockIdxes { + inBools[idx] = true + } + + var closeErr error + loop: + for { + //读块到buff + for i := 0; i < r.ecK; i++ { + _, err := io.ReadFull(input[i], chunks[inBlockIdxes[i]]) + if err != nil { + closeErr = err + break loop + } + } + + err := r.encoder.Reconstruct(chunks) + if err != nil { + return + } + + //输出到outWriter + for i := range outBlockIdxes { + outIndex := outBlockIdxes[i] + + err := myio.WriteAll(outWriters[i], chunks[outIndex]) + if err != nil { + closeErr = err + break loop + } + + // 设置buf长度为0,cap不会受影响。注:如果一个块既是输入又是输出,那不能清空这个块 + if !inBools[outIndex] { + chunks[outIndex] = chunks[outIndex][:0] + } + } } - for i := range outWriter { - outWriter[i].Close() + + for i := range outWriters { + outWriters[i].CloseWithError(closeErr) } }() - return outReader, nil + + return outReaders } diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index d90d461..c5f933b 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -22,49 +22,52 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type FileDataPacketType int32 +type StreamDataPacketType int32 const ( - FileDataPacketType_Data FileDataPacketType = 0 - FileDataPacketType_EOF FileDataPacketType = 1 + StreamDataPacketType_EOF StreamDataPacketType = 0 + StreamDataPacketType_Data StreamDataPacketType = 1 + StreamDataPacketType_SendArgs StreamDataPacketType = 2 ) -// Enum value maps for FileDataPacketType. +// Enum value maps for StreamDataPacketType. var ( - FileDataPacketType_name = map[int32]string{ - 0: "Data", - 1: "EOF", + StreamDataPacketType_name = map[int32]string{ + 0: "EOF", + 1: "Data", + 2: "SendArgs", } - FileDataPacketType_value = map[string]int32{ - "Data": 0, - "EOF": 1, + StreamDataPacketType_value = map[string]int32{ + "EOF": 0, + "Data": 1, + "SendArgs": 2, } ) -func (x FileDataPacketType) Enum() *FileDataPacketType { - p := new(FileDataPacketType) +func (x StreamDataPacketType) Enum() *StreamDataPacketType { + p := new(StreamDataPacketType) *p = x return p } -func (x FileDataPacketType) String() string { +func (x StreamDataPacketType) String() string { return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) } -func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor { +func (StreamDataPacketType) Descriptor() protoreflect.EnumDescriptor { return file_pkgs_grpc_agent_agent_proto_enumTypes[0].Descriptor() } -func (FileDataPacketType) Type() protoreflect.EnumType { +func (StreamDataPacketType) Type() protoreflect.EnumType { return &file_pkgs_grpc_agent_agent_proto_enumTypes[0] } -func (x FileDataPacketType) Number() protoreflect.EnumNumber { +func (x StreamDataPacketType) Number() protoreflect.EnumNumber { return protoreflect.EnumNumber(x) } -// Deprecated: Use FileDataPacketType.Descriptor instead. -func (FileDataPacketType) EnumDescriptor() ([]byte, []int) { +// Deprecated: Use StreamDataPacketType.Descriptor instead. +func (StreamDataPacketType) EnumDescriptor() ([]byte, []int) { return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} } @@ -74,8 +77,8 @@ type FileDataPacket struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` + Type StreamDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=StreamDataPacketType" json:"Type,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` } func (x *FileDataPacket) Reset() { @@ -110,11 +113,11 @@ func (*FileDataPacket) Descriptor() ([]byte, []int) { return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} } -func (x *FileDataPacket) GetType() FileDataPacketType { +func (x *FileDataPacket) GetType() StreamDataPacketType { if x != nil { return x.Type } - return FileDataPacketType_Data + return StreamDataPacketType_EOF } func (x *FileDataPacket) GetData() []byte { @@ -218,31 +221,218 @@ func (x *GetIPFSFileReq) GetFileHash() string { return "" } +// 注:EOF时data也可能有数据 +type StreamDataPacket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type StreamDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=StreamDataPacketType" json:"Type,omitempty"` + PlanID string `protobuf:"bytes,2,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + StreamID string `protobuf:"bytes,3,opt,name=StreamID,proto3" json:"StreamID,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=Data,proto3" json:"Data,omitempty"` +} + +func (x *StreamDataPacket) Reset() { + *x = StreamDataPacket{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamDataPacket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamDataPacket) ProtoMessage() {} + +func (x *StreamDataPacket) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamDataPacket.ProtoReflect.Descriptor instead. +func (*StreamDataPacket) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{3} +} + +func (x *StreamDataPacket) GetType() StreamDataPacketType { + if x != nil { + return x.Type + } + return StreamDataPacketType_EOF +} + +func (x *StreamDataPacket) GetPlanID() string { + if x != nil { + return x.PlanID + } + return "" +} + +func (x *StreamDataPacket) GetStreamID() string { + if x != nil { + return x.StreamID + } + return "" +} + +func (x *StreamDataPacket) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type SendStreamResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SendStreamResp) Reset() { + *x = SendStreamResp{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendStreamResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendStreamResp) ProtoMessage() {} + +func (x *SendStreamResp) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendStreamResp.ProtoReflect.Descriptor instead. +func (*SendStreamResp) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{4} +} + +type FetchStreamReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + StreamID string `protobuf:"bytes,2,opt,name=StreamID,proto3" json:"StreamID,omitempty"` +} + +func (x *FetchStreamReq) Reset() { + *x = FetchStreamReq{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FetchStreamReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FetchStreamReq) ProtoMessage() {} + +func (x *FetchStreamReq) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FetchStreamReq.ProtoReflect.Descriptor instead. +func (*FetchStreamReq) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{5} +} + +func (x *FetchStreamReq) GetPlanID() string { + if x != nil { + return x.PlanID + } + return "" +} + +func (x *FetchStreamReq) GetStreamID() string { + if x != nil { + return x.StreamID + } + return "" +} + var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x0a, 0x1b, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, + 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4f, 0x0a, 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, - 0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, - 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, - 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2e, 0x0a, 0x10, - 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x29, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, + 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2e, + 0x0a, 0x10, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2c, + 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2c, 0x0a, 0x0e, - 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1a, - 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69, - 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, - 0x46, 0x10, 0x01, 0x32, 0x74, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c, - 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, - 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e, - 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, - 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, - 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, + 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x85, 0x01, 0x0a, + 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, + 0x74, 0x12, 0x29, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x15, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, + 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, + 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, + 0x61, 0x6e, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, + 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, + 0x44, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x44, 0x0a, 0x0e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, + 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, + 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x2a, 0x37, 0x0a, 0x14, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, + 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, + 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0xe1, 0x01, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, + 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, + 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, + 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, + 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, + 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, + 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, + 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, + 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } @@ -260,24 +450,32 @@ func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { } var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ - (FileDataPacketType)(0), // 0: FileDataPacketType - (*FileDataPacket)(nil), // 1: FileDataPacket - (*SendIPFSFileResp)(nil), // 2: SendIPFSFileResp - (*GetIPFSFileReq)(nil), // 3: GetIPFSFileReq + (StreamDataPacketType)(0), // 0: StreamDataPacketType + (*FileDataPacket)(nil), // 1: FileDataPacket + (*SendIPFSFileResp)(nil), // 2: SendIPFSFileResp + (*GetIPFSFileReq)(nil), // 3: GetIPFSFileReq + (*StreamDataPacket)(nil), // 4: StreamDataPacket + (*SendStreamResp)(nil), // 5: SendStreamResp + (*FetchStreamReq)(nil), // 6: FetchStreamReq } var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ - 0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType - 1, // 1: Agent.SendIPFSFile:input_type -> FileDataPacket - 3, // 2: Agent.GetIPFSFile:input_type -> GetIPFSFileReq - 2, // 3: Agent.SendIPFSFile:output_type -> SendIPFSFileResp - 1, // 4: Agent.GetIPFSFile:output_type -> FileDataPacket - 3, // [3:5] is the sub-list for method output_type - 1, // [1:3] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType + 0, // 1: StreamDataPacket.Type:type_name -> StreamDataPacketType + 1, // 2: Agent.SendIPFSFile:input_type -> FileDataPacket + 3, // 3: Agent.GetIPFSFile:input_type -> GetIPFSFileReq + 4, // 4: Agent.SendStream:input_type -> StreamDataPacket + 6, // 5: Agent.FetchStream:input_type -> FetchStreamReq + 2, // 6: Agent.SendIPFSFile:output_type -> SendIPFSFileResp + 1, // 7: Agent.GetIPFSFile:output_type -> FileDataPacket + 5, // 8: Agent.SendStream:output_type -> SendStreamResp + 4, // 9: Agent.FetchStream:output_type -> StreamDataPacket + 6, // [6:10] is the sub-list for method output_type + 2, // [2:6] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_pkgs_grpc_agent_agent_proto_init() } @@ -322,6 +520,42 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } + file_pkgs_grpc_agent_agent_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamDataPacket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendStreamResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FetchStreamReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -329,7 +563,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc, NumEnums: 1, - NumMessages: 3, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index d8f53b4..f756fec 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -5,13 +5,14 @@ syntax = "proto3"; option go_package = ".;agent";//grpc这里生效了 -enum FileDataPacketType { - Data = 0; - EOF = 1; +enum StreamDataPacketType { + EOF = 0; + Data = 1; + SendArgs = 2; } // 文件数据。注意:只在Type为Data的时候,Data字段才能有数据 message FileDataPacket { - FileDataPacketType Type = 1; + StreamDataPacketType Type = 1; bytes Data = 2; } @@ -23,8 +24,27 @@ message GetIPFSFileReq { string FileHash = 1; } +// 注:EOF时data也可能有数据 +message StreamDataPacket { + StreamDataPacketType Type = 1; + string PlanID = 2; + string StreamID = 3; + bytes Data = 4; +} + +message SendStreamResp { +} + +message FetchStreamReq { + string PlanID = 1; + string StreamID = 2; +} + service Agent { rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){} rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){} + + rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){} + rpc FetchStream(FetchStreamReq)returns(stream StreamDataPacket){} } diff --git a/common/pkgs/grpc/agent/agent_grpc.pb.go b/common/pkgs/grpc/agent/agent_grpc.pb.go index d32adde..95b2f92 100644 --- a/common/pkgs/grpc/agent/agent_grpc.pb.go +++ b/common/pkgs/grpc/agent/agent_grpc.pb.go @@ -23,6 +23,8 @@ const _ = grpc.SupportPackageIsVersion7 const ( Agent_SendIPFSFile_FullMethodName = "/Agent/SendIPFSFile" Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile" + Agent_SendStream_FullMethodName = "/Agent/SendStream" + Agent_FetchStream_FullMethodName = "/Agent/FetchStream" ) // AgentClient is the client API for Agent service. @@ -31,6 +33,8 @@ const ( type AgentClient interface { SendIPFSFile(ctx context.Context, opts ...grpc.CallOption) (Agent_SendIPFSFileClient, error) GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) + SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) + FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error) } type agentClient struct { @@ -107,12 +111,80 @@ func (x *agentGetIPFSFileClient) Recv() (*FileDataPacket, error) { return m, nil } +func (c *agentClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[2], Agent_SendStream_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &agentSendStreamClient{stream} + return x, nil +} + +type Agent_SendStreamClient interface { + Send(*StreamDataPacket) error + CloseAndRecv() (*SendStreamResp, error) + grpc.ClientStream +} + +type agentSendStreamClient struct { + grpc.ClientStream +} + +func (x *agentSendStreamClient) Send(m *StreamDataPacket) error { + return x.ClientStream.SendMsg(m) +} + +func (x *agentSendStreamClient) CloseAndRecv() (*SendStreamResp, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SendStreamResp) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *agentClient) FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[3], Agent_FetchStream_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &agentFetchStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Agent_FetchStreamClient interface { + Recv() (*StreamDataPacket, error) + grpc.ClientStream +} + +type agentFetchStreamClient struct { + grpc.ClientStream +} + +func (x *agentFetchStreamClient) Recv() (*StreamDataPacket, error) { + m := new(StreamDataPacket) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // AgentServer is the server API for Agent service. // All implementations must embed UnimplementedAgentServer // for forward compatibility type AgentServer interface { SendIPFSFile(Agent_SendIPFSFileServer) error GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error + SendStream(Agent_SendStreamServer) error + FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error mustEmbedUnimplementedAgentServer() } @@ -126,6 +198,12 @@ func (UnimplementedAgentServer) SendIPFSFile(Agent_SendIPFSFileServer) error { func (UnimplementedAgentServer) GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error { return status.Errorf(codes.Unimplemented, "method GetIPFSFile not implemented") } +func (UnimplementedAgentServer) SendStream(Agent_SendStreamServer) error { + return status.Errorf(codes.Unimplemented, "method SendStream not implemented") +} +func (UnimplementedAgentServer) FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error { + return status.Errorf(codes.Unimplemented, "method FetchStream not implemented") +} func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {} // UnsafeAgentServer may be embedded to opt out of forward compatibility for this service. @@ -186,6 +264,53 @@ func (x *agentGetIPFSFileServer) Send(m *FileDataPacket) error { return x.ServerStream.SendMsg(m) } +func _Agent_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(AgentServer).SendStream(&agentSendStreamServer{stream}) +} + +type Agent_SendStreamServer interface { + SendAndClose(*SendStreamResp) error + Recv() (*StreamDataPacket, error) + grpc.ServerStream +} + +type agentSendStreamServer struct { + grpc.ServerStream +} + +func (x *agentSendStreamServer) SendAndClose(m *SendStreamResp) error { + return x.ServerStream.SendMsg(m) +} + +func (x *agentSendStreamServer) Recv() (*StreamDataPacket, error) { + m := new(StreamDataPacket) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Agent_FetchStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(FetchStreamReq) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AgentServer).FetchStream(m, &agentFetchStreamServer{stream}) +} + +type Agent_FetchStreamServer interface { + Send(*StreamDataPacket) error + grpc.ServerStream +} + +type agentFetchStreamServer struct { + grpc.ServerStream +} + +func (x *agentFetchStreamServer) Send(m *StreamDataPacket) error { + return x.ServerStream.SendMsg(m) +} + // Agent_ServiceDesc is the grpc.ServiceDesc for Agent service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -204,6 +329,16 @@ var Agent_ServiceDesc = grpc.ServiceDesc{ Handler: _Agent_GetIPFSFile_Handler, ServerStreams: true, }, + { + StreamName: "SendStream", + Handler: _Agent_SendStream_Handler, + ClientStreams: true, + }, + { + StreamName: "FetchStream", + Handler: _Agent_FetchStream_Handler, + ServerStreams: true, + }, }, Metadata: "pkgs/grpc/agent/agent.proto", } diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index e23a6ee..440e49a 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -5,6 +5,7 @@ import ( "fmt" "io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -37,7 +38,7 @@ func (c *Client) SendIPFSFile(file io.Reader) (string, error) { rd, err := file.Read(buf) if err == io.EOF { err := sendCli.Send(&FileDataPacket{ - Type: FileDataPacketType_EOF, + Type: StreamDataPacketType_EOF, Data: buf[:rd], }) if err != nil { @@ -57,7 +58,7 @@ func (c *Client) SendIPFSFile(file io.Reader) (string, error) { } err = sendCli.Send(&FileDataPacket{ - Type: FileDataPacketType_Data, + Type: StreamDataPacketType_Data, Data: buf[:rd], }) if err != nil { @@ -68,7 +69,9 @@ func (c *Client) SendIPFSFile(file io.Reader) (string, error) { type fileReadCloser struct { io.ReadCloser - stream Agent_GetIPFSFileClient + // stream Agent_GetIPFSFileClient + // TODO 临时使用 + recvFn func() (*StreamDataPacket, error) cancelFn context.CancelFunc readingData []byte recvEOF bool @@ -76,15 +79,15 @@ type fileReadCloser struct { func (s *fileReadCloser) Read(p []byte) (int, error) { if len(s.readingData) == 0 && !s.recvEOF { - resp, err := s.stream.Recv() + resp, err := s.recvFn() if err != nil { return 0, err } - if resp.Type == FileDataPacketType_Data { + if resp.Type == StreamDataPacketType_Data { s.readingData = resp.Data - } else if resp.Type == FileDataPacketType_EOF { + } else if resp.Type == StreamDataPacketType_EOF { s.readingData = resp.Data s.recvEOF = true @@ -121,7 +124,87 @@ func (c *Client) GetIPFSFile(fileHash string) (io.ReadCloser, error) { } return &fileReadCloser{ - stream: stream, + // TODO 临时处理方案 + recvFn: func() (*StreamDataPacket, error) { + pkt, err := stream.Recv() + if err != nil { + return nil, err + } + + return &StreamDataPacket{ + Type: pkt.Type, + Data: pkt.Data, + }, nil + }, + cancelFn: cancel, + }, nil +} + +func (c *Client) SendStream(planID ioswitch.PlanID, streamID ioswitch.StreamID, file io.Reader) error { + sendCli, err := c.cli.SendStream(context.Background()) + if err != nil { + return err + } + + err = sendCli.Send(&StreamDataPacket{ + Type: StreamDataPacketType_SendArgs, + PlanID: string(planID), + StreamID: string(streamID), + }) + if err != nil { + return fmt.Errorf("sending stream id packet: %w", err) + } + + buf := make([]byte, 4096) + for { + rd, err := file.Read(buf) + if err == io.EOF { + err := sendCli.Send(&StreamDataPacket{ + Type: StreamDataPacketType_EOF, + StreamID: string(streamID), + Data: buf[:rd], + }) + if err != nil { + return fmt.Errorf("sending EOF packet: %w", err) + } + + _, err = sendCli.CloseAndRecv() + if err != nil { + return fmt.Errorf("receiving response: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("reading file data: %w", err) + } + + err = sendCli.Send(&StreamDataPacket{ + Type: StreamDataPacketType_Data, + StreamID: string(streamID), + Data: buf[:rd], + }) + if err != nil { + return fmt.Errorf("sending data packet: %w", err) + } + } +} + +func (c *Client) FetchStream(planID ioswitch.PlanID, streamID ioswitch.StreamID) (io.ReadCloser, error) { + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := c.cli.FetchStream(ctx, &FetchStreamReq{ + PlanID: string(planID), + StreamID: string(streamID), + }) + if err != nil { + cancel() + return nil, fmt.Errorf("request grpc failed, err: %w", err) + } + + return &fileReadCloser{ + recvFn: stream.Recv, cancelFn: cancel, }, nil } diff --git a/common/pkgs/ioswitch/ioswitch.go b/common/pkgs/ioswitch/ioswitch.go new file mode 100644 index 0000000..1c8fc34 --- /dev/null +++ b/common/pkgs/ioswitch/ioswitch.go @@ -0,0 +1,35 @@ +package ioswitch + +import ( + "io" +) + +type PlanID string + +type StreamID string + +type Plan struct { + ID PlanID + Ops []Op +} + +type Stream struct { + ID StreamID + Stream io.ReadCloser +} + +func NewStream(id StreamID, stream io.ReadCloser) Stream { + return Stream{ + ID: id, + Stream: stream, + } +} + +type Op interface { + Execute(sw *Switch, planID PlanID) error +} + +type ResultKV struct { + Key string + Value any +} diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go new file mode 100644 index 0000000..48d1b6e --- /dev/null +++ b/common/pkgs/ioswitch/ops/ops.go @@ -0,0 +1,241 @@ +package ops + +import ( + "context" + "fmt" + "io" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/types" + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/serder" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ec" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op]( + (*IPFSRead)(nil), + (*IPFSWrite)(nil), + (*GRPCSend)(nil), + (*GRPCFetch)(nil), + (*ECCompute)(nil), + (*Join)(nil), +))) + +type IPFSRead struct { + Output ioswitch.StreamID `json:"output"` + FileHash string `json:"fileHash"` +} + +func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("FileHash", o.FileHash). + WithField("Output", o.Output). + Debugf("ipfs read op") + defer logger.Debugf("ipfs read op finished") + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + file, err := ipfsCli.OpenRead(o.FileHash) + if err != nil { + return fmt.Errorf("reading ipfs: %w", err) + } + + fut := future.NewSetVoid() + file = myio.AfterReadClosed(file, func(closer io.ReadCloser) { + fut.SetVoid() + }) + + sw.StreamReady(planID, ioswitch.NewStream(o.Output, file)) + + // TODO context + fut.Wait(context.TODO()) + return nil +} + +type IPFSWrite struct { + Input ioswitch.StreamID `json:"input"` + ResultKey string `json:"resultKey"` +} + +func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("ResultKey", o.ResultKey). + WithField("Input", o.Input). + Debugf("ipfs write op") + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + strs, err := sw.WaitStreams(planID, o.Input) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + fileHash, err := ipfsCli.CreateFile(strs[0].Stream) + if err != nil { + return fmt.Errorf("creating ipfs file: %w", err) + } + + if o.ResultKey != "" { + sw.AddResultValue(planID, ioswitch.ResultKV{ + Key: o.ResultKey, + Value: fileHash, + }) + } + + return nil +} + +type GRPCSend struct { + StreamID ioswitch.StreamID `json:"streamID"` + Node model.Node `json:"node"` +} + +func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("ioswitch.StreamID", o.StreamID). + Debugf("grpc send") + + strs, err := sw.WaitStreams(planID, o.StreamID) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + // TODO 根据客户端地址选择IP和端口 + agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + err = agtCli.SendStream(planID, o.StreamID, strs[0].Stream) + if err != nil { + return fmt.Errorf("sending stream: %w", err) + } + + return nil +} + +type GRPCFetch struct { + StreamID ioswitch.StreamID `json:"streamID"` + Node model.Node `json:"node"` +} + +func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + // TODO 根据客户端地址选择IP和端口 + agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + str, err := agtCli.FetchStream(planID, o.StreamID) + if err != nil { + return fmt.Errorf("fetching stream: %w", err) + } + + fut := future.NewSetVoid() + str = myio.AfterReadClosed(str, func(closer io.ReadCloser) { + fut.SetVoid() + }) + + sw.StreamReady(planID, ioswitch.NewStream(o.StreamID, str)) + + // TODO + fut.Wait(context.TODO()) + + return err +} + +type ECCompute struct { + EC stgmod.EC `json:"ec"` + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` + OutputBlockIndexes []int `json:"outputBlockIndexes"` +} + +func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + rs, err := ec.NewRs(o.EC.K, o.EC.N, o.EC.ChunkSize) + if err != nil { + return fmt.Errorf("new ec: %w", err) + } + + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + defer func() { + for _, s := range strs { + s.Stream.Close() + } + }() + + var inputs []io.Reader + for _, s := range strs { + inputs = append(inputs, s.Stream) + } + + outputs := rs.ReconstructSome(inputs, o.InputBlockIndexes, o.OutputBlockIndexes) + + wg := sync.WaitGroup{} + for i, id := range o.OutputIDs { + wg.Add(1) + sw.StreamReady(planID, ioswitch.NewStream(id, myio.AfterReadClosed(outputs[i], func(closer io.ReadCloser) { + wg.Done() + }))) + } + wg.Wait() + + return nil +} + +type Join struct { + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputID ioswitch.StreamID `json:"outputID"` + Length int64 `json:"length"` +} + +func (o *Join) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + + var strReaders []io.Reader + for _, s := range strs { + strReaders = append(strReaders, s.Stream) + } + defer func() { + for _, str := range strs { + str.Stream.Close() + } + }() + + fut := future.NewSetVoid() + sw.StreamReady(planID, + ioswitch.NewStream(o.OutputID, + myio.AfterReadClosed(myio.Length(myio.Join(strReaders), o.Length), func(closer io.ReadCloser) { + fut.SetVoid() + }), + ), + ) + + fut.Wait(context.TODO()) + return nil +} diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go new file mode 100644 index 0000000..cc23209 --- /dev/null +++ b/common/pkgs/ioswitch/plans/executor.go @@ -0,0 +1,177 @@ +package plans + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + + "gitlink.org.cn/cloudream/common/pkgs/future" + myio "gitlink.org.cn/cloudream/common/utils/io" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" +) + +type ExecutorResult struct { + ResultValues map[string]any +} + +type Executor struct { + plan ComposedPlan + callback *future.SetValueFuture[ExecutorResult] + mqClis []*agtmq.Client + planTaskIDs []string +} + +func Execute(plan ComposedPlan) (*Executor, error) { + executor := Executor{ + plan: plan, + callback: future.NewSetValue[ExecutorResult](), + } + + var err error + for _, a := range plan.AgentPlans { + var cli *agtmq.Client + cli, err = stgglb.AgentMQPool.Acquire(a.Node.NodeID) + if err != nil { + executor.Close() + return nil, fmt.Errorf("new mq client for %d: %w", a.Node.NodeID, err) + } + + executor.mqClis = append(executor.mqClis, cli) + } + + for i, a := range plan.AgentPlans { + cli := executor.mqClis[i] + + _, err := cli.SetupIOPlan(agtmq.NewSetupIOPlan(a.Plan)) + if err != nil { + for i -= 1; i >= 0; i-- { + executor.mqClis[i].CancelIOPlan(agtmq.NewCancelIOPlan(plan.ID)) + } + executor.Close() + return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err) + } + } + + for i, a := range plan.AgentPlans { + cli := executor.mqClis[i] + + resp, err := cli.StartIOPlan(agtmq.NewStartIOPlan(a.Plan.ID)) + if err != nil { + executor.cancelAll() + executor.Close() + return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err) + } + + executor.planTaskIDs = append(executor.planTaskIDs, resp.TaskID) + } + + go executor.pollResult() + + return &executor, nil +} + +func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error { + // TODO 根据地域选择IP + agtCli, err := stgglb.AgentRPCPool.Acquire(info.toNode.ExternalIP, info.toNode.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + return agtCli.SendStream(e.plan.ID, info.info.ID, stream) +} + +func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) { + // TODO 根据地域选择IP + agtCli, err := stgglb.AgentRPCPool.Acquire(info.fromNode.ExternalIP, info.fromNode.ExternalGRPCPort) + if err != nil { + return nil, fmt.Errorf("new agent rpc client: %w", err) + } + + str, err := agtCli.FetchStream(e.plan.ID, info.info.ID) + if err != nil { + return nil, err + } + + return myio.AfterReadClosed(str, func(closer io.ReadCloser) { + stgglb.AgentRPCPool.Release(agtCli) + }), nil +} + +func (e *Executor) Wait() (ExecutorResult, error) { + return e.callback.WaitValue(context.TODO()) +} + +func (e *Executor) cancelAll() { + for _, cli := range e.mqClis { + cli.CancelIOPlan(agtmq.NewCancelIOPlan(e.plan.ID)) + } +} + +func (e *Executor) Close() { + for _, c := range e.mqClis { + stgglb.AgentMQPool.Release(c) + } +} + +func (e *Executor) pollResult() { + wg := sync.WaitGroup{} + var anyErr error + var done atomic.Bool + rets := make([]*ioswitch.PlanResult, len(e.plan.AgentPlans)) + + for i, id := range e.planTaskIDs { + idx := i + taskID := id + + wg.Add(1) + go func() { + defer wg.Done() + + for { + resp, err := e.mqClis[idx].WaitIOPlan(agtmq.NewWaitIOPlan(taskID, 5000)) + if err != nil { + anyErr = err + break + } + + if resp.IsComplete { + if resp.Error != "" { + anyErr = errors.New(resp.Error) + done.Store(true) + } else { + rets[idx] = &resp.Result + } + break + } + + if done.Load() { + break + } + } + }() + } + + wg.Wait() + + if anyErr != nil { + e.callback.SetError(anyErr) + return + } + + reducedRet := ExecutorResult{ + ResultValues: make(map[string]any), + } + for _, ret := range rets { + for k, v := range ret.Values { + reducedRet.ResultValues[k] = v + } + } + + e.callback.SetValue(reducedRet) +} diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go new file mode 100644 index 0000000..8d0ab13 --- /dev/null +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -0,0 +1,230 @@ +package plans + +import ( + "fmt" + + "github.com/google/uuid" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" +) + +type StreamInfo struct { + ID ioswitch.StreamID +} + +type PlanBuilder struct { + streams []*StreamInfo + agentPlans map[int64]*AgentPlanBuilder +} + +func (b *PlanBuilder) Build() (*ComposedPlan, error) { + planID := uuid.NewString() + + var agentPlans []AgentPlan + for _, b := range b.agentPlans { + plan, err := b.Build(ioswitch.PlanID(planID)) + if err != nil { + return nil, err + } + + agentPlans = append(agentPlans, plan) + } + + return &ComposedPlan{ + ID: ioswitch.PlanID(planID), + AgentPlans: agentPlans, + }, nil +} + +func (b *PlanBuilder) newStream() *StreamInfo { + str := &StreamInfo{ + ID: ioswitch.StreamID(fmt.Sprintf("%d", len(b.streams)+1)), + } + + b.streams = append(b.streams, str) + + return str +} + +func NewPlanBuilder() PlanBuilder { + return PlanBuilder{ + agentPlans: make(map[int64]*AgentPlanBuilder), + } +} + +func (b *PlanBuilder) FromExecutor() *FromExecutorStream { + return &FromExecutorStream{ + owner: b, + info: b.newStream(), + } +} + +func (b *PlanBuilder) AtAgent(node model.Node) *AgentPlanBuilder { + agtPlan, ok := b.agentPlans[node.NodeID] + if !ok { + agtPlan = &AgentPlanBuilder{ + owner: b, + node: node, + } + b.agentPlans[node.NodeID] = agtPlan + } + + return agtPlan +} + +type FromExecutorStream struct { + owner *PlanBuilder + info *StreamInfo + toNode *model.Node +} + +func (s *FromExecutorStream) ToNode(node model.Node) *AgentStream { + s.toNode = &node + return &AgentStream{ + owner: s.owner.AtAgent(node), + info: s.info, + } +} + +type ToExecutorStream struct { + info *StreamInfo + fromNode *model.Node +} + +type AgentStream struct { + owner *AgentPlanBuilder + info *StreamInfo +} + +func (s *AgentStream) IPFSWrite(resultKey string) { + s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{ + Input: s.info.ID, + ResultKey: resultKey, + }) +} + +func (s *AgentStream) GRPCSend(node model.Node) *AgentStream { + agtStr := &AgentStream{ + owner: s.owner.owner.AtAgent(node), + info: s.info, + } + + s.owner.ops = append(s.owner.ops, &ops.GRPCSend{ + StreamID: s.info.ID, + Node: node, + }) + + return agtStr +} + +func (s *AgentStream) ToExecutor() *ToExecutorStream { + return &ToExecutorStream{ + info: s.info, + fromNode: &s.owner.node, + } +} + +type AgentPlanBuilder struct { + owner *PlanBuilder + node model.Node + ops []ioswitch.Op +} + +func (b *AgentPlanBuilder) GRCPFetch(node model.Node) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + b.ops = append(b.ops, &ops.GRPCFetch{ + StreamID: agtStr.info.ID, + Node: node, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + b.ops = append(b.ops, &ops.IPFSRead{ + Output: agtStr.info.ID, + FileHash: fileHash, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { + mstr := &MultiStream{} + + var inputStrIDs []ioswitch.StreamID + for _, str := range streams { + inputStrIDs = append(inputStrIDs, str.info.ID) + } + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < ec.N-ec.K; i++ { + info := b.owner.newStream() + mstr.streams[i] = &AgentStream{ + owner: b, + info: info, + } + outputStrIDs = append(outputStrIDs, info.ID) + } + + b.ops = append(b.ops, &ops.ECCompute{ + EC: ec, + InputIDs: inputStrIDs, + OutputIDs: outputStrIDs, + InputBlockIndexes: inBlockIndexes, + OutputBlockIndexes: outBlockIndexes, + }) + + return mstr +} + +func (b *AgentPlanBuilder) Join(length int64, streams ...*AgentStream) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + var inputStrIDs []ioswitch.StreamID + for _, str := range streams { + inputStrIDs = append(inputStrIDs, str.info.ID) + } + + b.ops = append(b.ops, &ops.Join{ + InputIDs: inputStrIDs, + OutputID: agtStr.info.ID, + Length: length, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) { + plan := ioswitch.Plan{ + ID: planID, + Ops: b.ops, + } + + return AgentPlan{ + Plan: plan, + Node: b.node, + }, nil +} + +type MultiStream struct { + streams []*AgentStream +} + +func (m *MultiStream) Stream(index int) *AgentStream { + return m.streams[index] +} diff --git a/common/pkgs/ioswitch/plans/plans.go b/common/pkgs/ioswitch/plans/plans.go new file mode 100644 index 0000000..3ea9e8e --- /dev/null +++ b/common/pkgs/ioswitch/plans/plans.go @@ -0,0 +1,16 @@ +package plans + +import ( + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type AgentPlan struct { + Node model.Node + Plan ioswitch.Plan +} + +type ComposedPlan struct { + ID ioswitch.PlanID + AgentPlans []AgentPlan +} diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go new file mode 100644 index 0000000..aa1c584 --- /dev/null +++ b/common/pkgs/ioswitch/switch.go @@ -0,0 +1,294 @@ +package ioswitch + +import ( + "context" + "errors" + "fmt" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/lo" +) + +var ErrPlanFinished = errors.New("plan is finished") + +var ErrPlanNotFound = errors.New("plan not found") + +type OpState string + +const ( + OpPending OpState = "Pending" + OpFinished OpState = "Finished" +) + +type Oping struct { + State OpState +} + +type PlanResult struct { + Values map[string]any `json:"values"` +} + +type Planning struct { + plan Plan + opings []Oping + resultValues map[string]any + callback *future.SetValueFuture[PlanResult] + + readys map[StreamID]Stream + waittings []*Watting +} + +func NewPlanning(plan Plan) Planning { + planning := Planning{ + plan: plan, + resultValues: make(map[string]any), + callback: future.NewSetValue[PlanResult](), + readys: make(map[StreamID]Stream), + } + + for range plan.Ops { + oping := Oping{ + State: OpPending, + } + planning.opings = append(planning.opings, oping) + } + + return planning +} + +func (p *Planning) IsCompleted() bool { + for _, oping := range p.opings { + if oping.State != OpFinished { + return false + } + } + + return true +} + +func (p *Planning) MakeResult() PlanResult { + return PlanResult{ + Values: p.resultValues, + } +} + +type Watting struct { + WaitIDs []StreamID + Readys []Stream + Callback *future.SetValueFuture[[]Stream] +} + +func (w *Watting) TryReady(str Stream) bool { + for i, id := range w.WaitIDs { + if id == str.ID { + w.Readys[i] = str + return true + } + } + + return false +} + +func (c *Watting) IsAllReady() bool { + for _, s := range c.Readys { + if s.Stream == nil { + return false + } + } + + return true +} + +func (w *Watting) Complete() { + w.Callback.SetValue(w.Readys) +} + +func (w *Watting) Cancel(err error) { + w.Callback.SetError(err) +} + +type Switch struct { + lock sync.Mutex + plannings map[PlanID]*Planning +} + +func NewSwitch() Switch { + return Switch{ + plannings: make(map[PlanID]*Planning), + } +} + +func (s *Switch) SetupPlan(plan Plan) error { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.plannings[plan.ID]; ok { + return fmt.Errorf("plan id exists") + } + + planning := NewPlanning(plan) + s.plannings[plan.ID] = &planning + return nil +} + +func (s *Switch) ExecutePlan(id PlanID) (PlanResult, error) { + s.lock.Lock() + + planning, ok := s.plannings[id] + if !ok { + s.lock.Unlock() + return PlanResult{}, fmt.Errorf("plan not found") + } + + for i, op := range planning.plan.Ops { + idx := i + o := op + go func() { + err := o.Execute(s, id) + + s.lock.Lock() + defer s.lock.Unlock() + + if err != nil { + logger.Std.Warnf("exeucting op: %s", err.Error()) + s.cancelPlan(id) + return + } + + planning.opings[idx].State = OpFinished + if planning.IsCompleted() { + s.completePlan(id) + } + }() + } + s.lock.Unlock() + + return planning.callback.WaitValue(context.TODO()) +} + +func (s *Switch) CancelPlan(id PlanID) { + s.lock.Lock() + defer s.lock.Unlock() + + s.cancelPlan(id) +} + +func (s *Switch) cancelPlan(id PlanID) { + plan, ok := s.plannings[id] + if !ok { + return + } + + delete(s.plannings, id) + + for _, s := range plan.readys { + s.Stream.Close() + } + + for _, c := range plan.waittings { + c.Callback.SetError(ErrPlanFinished) + } + + plan.callback.SetError(fmt.Errorf("plan cancelled")) +} + +func (s *Switch) completePlan(id PlanID) { + plan, ok := s.plannings[id] + if !ok { + return + } + + delete(s.plannings, id) + + for _, s := range plan.readys { + s.Stream.Close() + } + + for _, c := range plan.waittings { + c.Callback.SetError(ErrPlanFinished) + } + + plan.callback.SetValue(plan.MakeResult()) +} + +func (s *Switch) StreamReady(planID PlanID, stream Stream) { + s.lock.Lock() + defer s.lock.Unlock() + + plan, ok := s.plannings[planID] + if !ok { + //TODO 处理错误 + return + } + + for i, wa := range plan.waittings { + if !wa.TryReady(stream) { + continue + } + + if !wa.IsAllReady() { + return + } + + plan.waittings = lo.RemoveAt(plan.waittings, i) + wa.Complete() + return + } + + plan.readys[stream.ID] = stream +} + +func (s *Switch) WaitStreams(planID PlanID, streamIDs ...StreamID) ([]Stream, error) { + s.lock.Lock() + + plan, ok := s.plannings[planID] + if !ok { + s.lock.Unlock() + return nil, ErrPlanNotFound + } + + allReady := true + readys := make([]Stream, len(streamIDs)) + for i, id := range streamIDs { + str, ok := plan.readys[id] + if !ok { + allReady = false + continue + } + + readys[i] = str + delete(plan.readys, id) + } + + if allReady { + s.lock.Unlock() + return readys, nil + } + + callback := future.NewSetValue[[]Stream]() + + plan.waittings = append(plan.waittings, &Watting{ + WaitIDs: streamIDs, + Readys: readys, + Callback: callback, + }) + s.lock.Unlock() + + return callback.WaitValue(context.TODO()) +} + +func (s *Switch) AddResultValue(planID PlanID, rets ...ResultKV) { + s.lock.Lock() + defer s.lock.Unlock() + + plan, ok := s.plannings[planID] + if !ok { + return + } + + for _, ret := range rets { + plan.resultValues[ret.Key] = ret.Value + } +} diff --git a/common/pkgs/iterator/ec_object_iterator.go b/common/pkgs/iterator/ec_object_iterator.go index 5f34e08..97b9ae1 100644 --- a/common/pkgs/iterator/ec_object_iterator.go +++ b/common/pkgs/iterator/ec_object_iterator.go @@ -4,13 +4,12 @@ import ( "fmt" "io" "math/rand" - "os" "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmodels "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -73,19 +72,23 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb obj := iter.objects[iter.currentIndex] ecData := iter.objectECData[iter.currentIndex] - blocks := ecData.Blocks - ec := iter.ec - ecK := ec.EcK - ecN := ec.EcN //采取直接读,优先选内网节点 - hashs := make([]string, ecK) - nds := make([]DownloadNodeInfo, ecK) - for i := 0; i < ecK; i++ { - hashs[i] = blocks[i].FileHash + var chosenNodes []DownloadNodeInfo + var chosenBlocks []stgmodels.ObjectBlockData + for i := range ecData.Blocks { + if len(chosenBlocks) == iter.ec.EcK { + break + } + + // 块没有被任何节点缓存或者获取失败都没关系,只要能获取到k个块的信息就行 + + if len(ecData.Blocks[i].NodeIDs) == 0 { + continue + } - getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(blocks[i].NodeIDs)) + getNodesResp, err := coorCli.GetNodes(coormq.NewGetNodes(ecData.Blocks[i].NodeIDs)) if err != nil { - return nil, fmt.Errorf("getting nodes: %w", err) + continue } downloadNodes := lo.Map(getNodesResp.Nodes, func(node model.Node, index int) DownloadNodeInfo { @@ -95,36 +98,23 @@ func (iter *ECObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingOb } }) - nds[i] = iter.chooseDownloadNode(downloadNodes) - } + chosenBlocks = append(chosenBlocks, ecData.Blocks[i]) + chosenNodes = append(chosenNodes, iter.chooseDownloadNode(downloadNodes)) - //nodeIDs, nodeIPs直接按照第1~ecK个排列 - nodeIDs := make([]int64, ecK) - nodeIPs := make([]string, ecK) - grpcPorts := make([]int, ecK) - for i := 0; i < ecK; i++ { - nodeIDs[i] = nds[i].Node.NodeID - nodeIPs[i] = nds[i].Node.ExternalIP - grpcPorts[i] = nds[i].Node.ExternalGRPCPort - if nds[i].IsSameLocation { - nodeIPs[i] = nds[i].Node.LocalIP - grpcPorts[i] = nds[i].Node.LocalGRPCPort - logger.Infof("client and node %d are at the same location, use local ip", nds[i].Node.NodeID) - } } - fileSize := obj.Size - blockIDs := make([]int, ecK) - for i := 0; i < ecK; i++ { - blockIDs[i] = i + if len(chosenBlocks) < iter.ec.EcK { + return nil, fmt.Errorf("no enough blocks to reconstruct the file, want %d, get only %d", iter.ec.EcK, len(chosenBlocks)) } - reader, err := iter.downloadEcObject(fileSize, ecK, ecN, blockIDs, nodeIDs, nodeIPs, grpcPorts, hashs) + + reader, err := iter.downloadEcObject(iter.downloadCtx, obj.Size, chosenNodes, chosenBlocks) if err != nil { return nil, fmt.Errorf("ec read failed, err: %w", err) } return &IterDownloadingObject{ - File: reader, + Object: obj, + File: reader, }, nil } @@ -146,87 +136,36 @@ func (i *ECObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downlo return entries[rand.Intn(len(entries))] } -func (iter *ECObjectIterator) downloadEcObject(fileSize int64, ecK int, ecN int, blockIDs []int, nodeIDs []int64, nodeIPs []string, grpcPorts []int, hashs []string) (io.ReadCloser, error) { - // TODO zkx 先试用同步方式实现逻辑,做好错误处理。同时也方便下面直接使用uploadToNode和uploadToLocalIPFS来优化代码结构 - //wg := sync.WaitGroup{} - numPacket := (fileSize + int64(ecK)*iter.ecInfo.PacketSize - 1) / (int64(ecK) * iter.ecInfo.PacketSize) - getBufs := make([]chan []byte, ecN) - decodeBufs := make([]chan []byte, ecK) - for i := 0; i < ecN; i++ { - getBufs[i] = make(chan []byte) - } - for i := 0; i < ecK; i++ { - decodeBufs[i] = make(chan []byte) - } - for idx := 0; idx < len(blockIDs); idx++ { - i := idx - go func() { - // TODO 处理错误 - file, _ := downloadFile(iter.downloadCtx, nodeIDs[i], nodeIPs[i], grpcPorts[i], hashs[i]) - - for p := int64(0); p < numPacket; p++ { - buf := make([]byte, iter.ecInfo.PacketSize) - // TODO 处理错误 - io.ReadFull(file, buf) - getBufs[blockIDs[i]] <- buf - } - }() - } - print(numPacket) - go decode(getBufs[:], decodeBufs[:], blockIDs, ecK, numPacket) - r, w := io.Pipe() - //persist函数,将解码得到的文件写入pipe - go func() { - for i := 0; int64(i) < numPacket; i++ { - for j := 0; j < len(decodeBufs); j++ { - tmp := <-decodeBufs[j] - _, err := w.Write(tmp) - if err != nil { - fmt.Errorf("persist file falied, err:%w", err) - } - } - } - w.Close() - }() - return r, nil -} +func (iter *ECObjectIterator) downloadEcObject(ctx *DownloadContext, fileSize int64, nodes []DownloadNodeInfo, blocks []stgmodels.ObjectBlockData) (io.ReadCloser, error) { + var fileStrs []io.ReadCloser -func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) { - fmt.Println("decode ") - var tmpIn [][]byte - var zeroPkt []byte - tmpIn = make([][]byte, len(inBufs)) - hasBlock := map[int]bool{} - for j := 0; j < len(blockSeq); j++ { - hasBlock[blockSeq[j]] = true - } - needRepair := false //检测是否传入了所有数据块 - for j := 0; j < len(outBufs); j++ { - if blockSeq[j] != j { - needRepair = true - } + rs, err := ec.NewRs(iter.ec.EcK, iter.ec.EcN, iter.ecInfo.ChunkSize) + if err != nil { + return nil, fmt.Errorf("new rs: %w", err) } - enc := ec.NewRsEnc(ecK, len(inBufs)) - for i := 0; int64(i) < numPacket; i++ { - print("!!!!!") - for j := 0; j < len(inBufs); j++ { - if hasBlock[j] { - tmpIn[j] = <-inBufs[j] - } else { - tmpIn[j] = zeroPkt - } - } - if needRepair { - err := enc.Repair(tmpIn) - if err != nil { - fmt.Fprintf(os.Stderr, "Decode Repair Error: %s", err.Error()) + + for i := range blocks { + str, err := downloadFile(ctx, nodes[i], blocks[i].FileHash) + if err != nil { + for i -= 1; i >= 0; i-- { + fileStrs[i].Close() } + return nil, fmt.Errorf("donwloading file: %w", err) } - for j := 0; j < len(outBufs); j++ { - outBufs[j] <- tmpIn[j] - } + + fileStrs = append(fileStrs, str) } - for i := 0; i < len(outBufs); i++ { - close(outBufs[i]) + + fileReaders, filesCloser := myio.ToReaders(fileStrs) + + var indexes []int + for _, b := range blocks { + indexes = append(indexes, b.Index) } + + outputs, outputsCloser := myio.ToReaders(rs.ReconstructData(fileReaders, indexes)) + return myio.AfterReadClosed(myio.Length(myio.ChunkedJoin(outputs, int(iter.ecInfo.ChunkSize)), fileSize), func(c io.ReadCloser) { + filesCloser() + outputsCloser() + }), nil } diff --git a/common/pkgs/iterator/rep_object_iterator.go b/common/pkgs/iterator/rep_object_iterator.go index f1ac0b2..de2504f 100644 --- a/common/pkgs/iterator/rep_object_iterator.go +++ b/common/pkgs/iterator/rep_object_iterator.go @@ -99,20 +99,7 @@ func (i *RepObjectIterator) doMove(coorCli *coormq.Client) (*IterDownloadingObje } }) - // 选择下载节点 - downloadNode := i.chooseDownloadNode(downloadNodes) - - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := downloadNode.Node.ExternalIP - grpcPort := downloadNode.Node.ExternalGRPCPort - if downloadNode.IsSameLocation { - nodeIP = downloadNode.Node.LocalIP - grpcPort = downloadNode.Node.LocalGRPCPort - - logger.Infof("client and node %d are at the same location, use local ip", downloadNode.Node.NodeID) - } - - reader, err := downloadFile(i.downloadCtx, downloadNode.Node.NodeID, nodeIP, grpcPort, repData.FileHash) + reader, err := downloadFile(i.downloadCtx, i.chooseDownloadNode(downloadNodes), repData.FileHash) if err != nil { return nil, fmt.Errorf("rep read failed, err: %w", err) } @@ -140,7 +127,17 @@ func (i *RepObjectIterator) chooseDownloadNode(entries []DownloadNodeInfo) Downl return entries[rand.Intn(len(entries))] } -func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { +func downloadFile(ctx *DownloadContext, node DownloadNodeInfo, fileHash string) (io.ReadCloser, error) { + // 如果客户端与节点在同一个地域,则使用内网地址连接节点 + nodeIP := node.Node.ExternalIP + grpcPort := node.Node.ExternalGRPCPort + if node.IsSameLocation { + nodeIP = node.Node.LocalIP + grpcPort = node.Node.LocalGRPCPort + + logger.Infof("client and node %d are at the same location, use local ip", node.Node.NodeID) + } + if stgglb.IPFSPool != nil { logger.Infof("try to use local IPFS to download file") @@ -152,7 +149,7 @@ func downloadFile(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort in logger.Warnf("download from local IPFS failed, so try to download from node %s, err: %s", nodeIP, err.Error()) } - return downloadFromNode(ctx, nodeID, nodeIP, grpcPort, fileHash) + return downloadFromNode(ctx, node.Node.NodeID, nodeIP, grpcPort, fileHash) } func downloadFromNode(ctx *DownloadContext, nodeID int64, nodeIP string, grpcPort int, fileHash string) (io.ReadCloser, error) { diff --git a/common/pkgs/mq/agent/io.go b/common/pkgs/mq/agent/io.go new file mode 100644 index 0000000..4a79cd8 --- /dev/null +++ b/common/pkgs/mq/agent/io.go @@ -0,0 +1,120 @@ +package agent + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type IOService interface { + SetupIOPlan(msg *SetupIOPlan) (*SetupIOPlanResp, *mq.CodeMessage) + + StartIOPlan(msg *StartIOPlan) (*StartIOPlanResp, *mq.CodeMessage) + + WaitIOPlan(msg *WaitIOPlan) (*WaitIOPlanResp, *mq.CodeMessage) + + CancelIOPlan(msg *CancelIOPlan) (*CancelIOPlanResp, *mq.CodeMessage) +} + +// 设置io计划 +var _ = Register(Service.SetupIOPlan) + +type SetupIOPlan struct { + mq.MessageBodyBase + Plan ioswitch.Plan `json:"plan"` +} +type SetupIOPlanResp struct { + mq.MessageBodyBase +} + +func NewSetupIOPlan(plan ioswitch.Plan) *SetupIOPlan { + return &SetupIOPlan{ + Plan: plan, + } +} +func NewSetupIOPlanResp() *SetupIOPlanResp { + return &SetupIOPlanResp{} +} +func (client *Client) SetupIOPlan(msg *SetupIOPlan, opts ...mq.RequestOption) (*SetupIOPlanResp, error) { + return mq.Request(Service.SetupIOPlan, client.rabbitCli, msg, opts...) +} + +// 启动io计划 +var _ = Register(Service.StartIOPlan) + +type StartIOPlan struct { + mq.MessageBodyBase + PlanID ioswitch.PlanID `json:"planID"` +} +type StartIOPlanResp struct { + mq.MessageBodyBase + TaskID string `json:"taskID"` +} + +func NewStartIOPlan(planID ioswitch.PlanID) *StartIOPlan { + return &StartIOPlan{ + PlanID: planID, + } +} +func NewStartIOPlanResp(taskID string) *StartIOPlanResp { + return &StartIOPlanResp{ + TaskID: taskID, + } +} +func (client *Client) StartIOPlan(msg *StartIOPlan, opts ...mq.RequestOption) (*StartIOPlanResp, error) { + return mq.Request(Service.StartIOPlan, client.rabbitCli, msg, opts...) +} + +// 启动io计划 +var _ = Register(Service.WaitIOPlan) + +type WaitIOPlan struct { + mq.MessageBodyBase + TaskID string `json:"taskID"` + WaitTimeoutMs int64 `json:"waitTimeout"` +} +type WaitIOPlanResp struct { + mq.MessageBodyBase + IsComplete bool `json:"isComplete"` + Error string `json:"error"` + Result ioswitch.PlanResult `json:"result"` +} + +func NewWaitIOPlan(taskID string, waitTimeoutMs int64) *WaitIOPlan { + return &WaitIOPlan{ + TaskID: taskID, + WaitTimeoutMs: waitTimeoutMs, + } +} +func NewWaitIOPlanResp(isComplete bool, err string, result ioswitch.PlanResult) *WaitIOPlanResp { + return &WaitIOPlanResp{ + IsComplete: isComplete, + Error: err, + Result: result, + } +} +func (client *Client) WaitIOPlan(msg *WaitIOPlan, opts ...mq.RequestOption) (*WaitIOPlanResp, error) { + return mq.Request(Service.WaitIOPlan, client.rabbitCli, msg, opts...) +} + +// 取消io计划 +var _ = Register(Service.CancelIOPlan) + +type CancelIOPlan struct { + mq.MessageBodyBase + PlanID ioswitch.PlanID `json:"planID"` +} +type CancelIOPlanResp struct { + mq.MessageBodyBase +} + +func NewCancelIOPlan(planID ioswitch.PlanID) *CancelIOPlan { + return &CancelIOPlan{ + PlanID: planID, + } +} +func NewCancelIOPlanResp() *CancelIOPlanResp { + return &CancelIOPlanResp{} +} +func (client *Client) CancelIOPlan(msg *CancelIOPlan, opts ...mq.RequestOption) (*CancelIOPlanResp, error) { + return mq.Request(Service.CancelIOPlan, client.rabbitCli, msg, opts...) +} diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index 8819c17..40b8706 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -6,6 +6,8 @@ import ( ) type Service interface { + IOService + ObjectService StorageService