diff --git a/agent/internal/services/grpc/io.go b/agent/internal/services/grpc/io.go new file mode 100644 index 0000000..8d016a7 --- /dev/null +++ b/agent/internal/services/grpc/io.go @@ -0,0 +1,136 @@ +package grpc + +import ( + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + myio "gitlink.org.cn/cloudream/common/utils/io" + agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +func (s *Service) SendStream(server agentserver.Agent_SendStreamServer) error { + msg, err := server.Recv() + if err != nil { + return fmt.Errorf("recving stream id packet: %w", err) + } + if msg.Type != agentserver.StreamDataPacketType_SendArgs { + return fmt.Errorf("first packet must be a SendArgs packet") + } + + logger. + WithField("PlanID", msg.PlanID). + WithField("StreamID", msg.StreamID). + Debugf("receive stream from grpc") + + pr, pw := io.Pipe() + + s.sw.StreamReady(ioswitch.PlanID(msg.PlanID), ioswitch.NewStream(ioswitch.StreamID(msg.StreamID), pr)) + + // 然后读取文件数据 + var recvSize int64 + for { + msg, err := server.Recv() + + // 读取客户端数据失败 + // 即使err是io.EOF,只要没有收到客户端包含EOF数据包就被断开了连接,就认为接收失败 + if err != nil { + // 关闭文件写入,不需要返回的hash和error + pw.CloseWithError(io.ErrClosedPipe) + logger.WithField("ReceiveSize", recvSize). + Warnf("recv message failed, err: %s", err.Error()) + return fmt.Errorf("recv message failed, err: %w", err) + } + + err = myio.WriteAll(pw, msg.Data) + if err != nil { + // 关闭文件写入,不需要返回的hash和error + pw.CloseWithError(io.ErrClosedPipe) + logger.Warnf("write data to file failed, err: %s", err.Error()) + return fmt.Errorf("write data to file failed, err: %w", err) + } + + recvSize += int64(len(msg.Data)) + + if msg.Type == agentserver.StreamDataPacketType_EOF { + // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash + err := pw.Close() + if err != nil { + logger.Warnf("finish writing failed, err: %s", err.Error()) + return fmt.Errorf("finish writing failed, err: %w", err) + } + + // 并将结果返回到客户端 + err = server.SendAndClose(&agentserver.SendStreamResp{}) + if err != nil { + logger.Warnf("send response failed, err: %s", err.Error()) + return fmt.Errorf("send response failed, err: %w", err) + } + + return nil + } + } +} + +func (s *Service) FetchStream(req *agentserver.FetchStreamReq, server agentserver.Agent_FetchStreamServer) error { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Debugf("send stream by grpc") + + strs, err := s.sw.WaitStreams(ioswitch.PlanID(req.PlanID), ioswitch.StreamID(req.StreamID)) + if err != nil { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Warnf("watting stream: %s", err.Error()) + return fmt.Errorf("watting stream: %w", err) + } + + reader := strs[0].Stream + defer reader.Close() + + buf := make([]byte, 4096) + readAllCnt := 0 + for { + readCnt, err := reader.Read(buf) + + if readCnt > 0 { + readAllCnt += readCnt + err = server.Send(&agentserver.StreamDataPacket{ + Type: agentserver.StreamDataPacketType_Data, + Data: buf[:readCnt], + }) + if err != nil { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Warnf("send stream data failed, err: %s", err.Error()) + return fmt.Errorf("send stream data failed, err: %w", err) + } + } + + // 文件读取完毕 + if err == io.EOF { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Debugf("send data size %d", readAllCnt) + // 发送EOF消息 + server.Send(&agentserver.StreamDataPacket{ + Type: agentserver.StreamDataPacketType_EOF, + }) + return nil + } + + // io.ErrUnexpectedEOF没有读满整个buf就遇到了EOF,此时正常发送剩余数据即可。除了这两个错误之外,其他错误都中断操作 + if err != nil && err != io.ErrUnexpectedEOF { + logger. + WithField("PlanID", req.PlanID). + WithField("StreamID", req.StreamID). + Warnf("reading stream data: %s", err.Error()) + return fmt.Errorf("reading stream data: %w", err) + } + } +} diff --git a/agent/internal/services/grpc/service.go b/agent/internal/services/grpc/service.go index 1f060d2..bff1bb8 100644 --- a/agent/internal/services/grpc/service.go +++ b/agent/internal/services/grpc/service.go @@ -8,14 +8,18 @@ import ( myio "gitlink.org.cn/cloudream/common/utils/io" stgglb "gitlink.org.cn/cloudream/storage/common/globals" agentserver "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Service struct { agentserver.AgentServer + sw *ioswitch.Switch } -func NewService() *Service { - return &Service{} +func NewService(sw *ioswitch.Switch) *Service { + return &Service{ + sw: sw, + } } func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) error { @@ -59,7 +63,7 @@ func (s *Service) SendIPFSFile(server agentserver.Agent_SendIPFSFileServer) erro recvSize += int64(len(msg.Data)) - if msg.Type == agentserver.FileDataPacketType_EOF { + if msg.Type == agentserver.StreamDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash hash, err := writer.Finish() if err != nil { @@ -106,7 +110,7 @@ func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserve if readCnt > 0 { readAllCnt += readCnt err = server.Send(&agentserver.FileDataPacket{ - Type: agentserver.FileDataPacketType_Data, + Type: agentserver.StreamDataPacketType_Data, Data: buf[:readCnt], }) if err != nil { @@ -121,7 +125,7 @@ func (s *Service) GetIPFSFile(req *agentserver.GetIPFSFileReq, server agentserve log.WithField("FileHash", req.FileHash).Debugf("send data size %d", readAllCnt) // 发送EOF消息 server.Send(&agentserver.FileDataPacket{ - Type: agentserver.FileDataPacketType_EOF, + Type: agentserver.StreamDataPacketType_EOF, }) return nil } diff --git a/agent/internal/services/mq/cache.go b/agent/internal/services/mq/cache.go index 9441b2b..0a36ff5 100644 --- a/agent/internal/services/mq/cache.go +++ b/agent/internal/services/mq/cache.go @@ -140,7 +140,7 @@ func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtm return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg, mvPkgTask.ResultCacheInfos)) } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" if tsk.Error() != nil { diff --git a/agent/internal/services/mq/io.go b/agent/internal/services/mq/io.go new file mode 100644 index 0000000..86e8e74 --- /dev/null +++ b/agent/internal/services/mq/io.go @@ -0,0 +1,65 @@ +package mq + +import ( + "time" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + mytask "gitlink.org.cn/cloudream/storage/agent/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" +) + +func (svc *Service) SetupIOPlan(msg *agtmq.SetupIOPlan) (*agtmq.SetupIOPlanResp, *mq.CodeMessage) { + err := svc.sw.SetupPlan(msg.Plan) + if err != nil { + logger.WithField("PlanID", msg.Plan.ID).Warnf("adding plan: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "adding plan failed") + } + + return mq.ReplyOK(agtmq.NewSetupIOPlanResp()) +} + +func (svc *Service) StartIOPlan(msg *agtmq.StartIOPlan) (*agtmq.StartIOPlanResp, *mq.CodeMessage) { + tsk := svc.taskManager.StartNew(mytask.NewExecuteIOPlan(msg.PlanID)) + return mq.ReplyOK(agtmq.NewStartIOPlanResp(tsk.ID())) +} + +func (svc *Service) WaitIOPlan(msg *agtmq.WaitIOPlan) (*agtmq.WaitIOPlanResp, *mq.CodeMessage) { + tsk := svc.taskManager.FindByID(msg.TaskID) + if tsk == nil { + return nil, mq.Failed(errorcode.TaskNotFound, "task not found") + } + + if msg.WaitTimeoutMs == 0 { + tsk.Wait() + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + planTsk := tsk.Body().(*mytask.ExecuteIOPlan) + return mq.ReplyOK(agtmq.NewWaitIOPlanResp(true, errMsg, planTsk.Result)) + + } else { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { + + errMsg := "" + if tsk.Error() != nil { + errMsg = tsk.Error().Error() + } + + planTsk := tsk.Body().(*mytask.ExecuteIOPlan) + return mq.ReplyOK(agtmq.NewWaitIOPlanResp(true, errMsg, planTsk.Result)) + } + + return mq.ReplyOK(agtmq.NewWaitIOPlanResp(false, "", ioswitch.PlanResult{})) + } +} + +func (svc *Service) CancelIOPlan(msg *agtmq.CancelIOPlan) (*agtmq.CancelIOPlanResp, *mq.CodeMessage) { + svc.sw.CancelPlan(msg.PlanID) + return mq.ReplyOK(agtmq.NewCancelIOPlanResp()) +} diff --git a/agent/internal/services/mq/object.go b/agent/internal/services/mq/object.go index 5331b84..e0ef5e6 100644 --- a/agent/internal/services/mq/object.go +++ b/agent/internal/services/mq/object.go @@ -43,7 +43,7 @@ func (svc *Service) WaitPinningObject(msg *agtmq.WaitPinningObject) (*agtmq.Wait return mq.ReplyOK(agtmq.NewWaitPinningObjectResp(true, errMsg)) } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" if tsk.Error() != nil { diff --git a/agent/internal/services/mq/service.go b/agent/internal/services/mq/service.go index 18c3e56..55789d4 100644 --- a/agent/internal/services/mq/service.go +++ b/agent/internal/services/mq/service.go @@ -2,14 +2,17 @@ package mq import ( "gitlink.org.cn/cloudream/storage/agent/internal/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type Service struct { taskManager *task.Manager + sw *ioswitch.Switch } -func NewService(taskMgr *task.Manager) *Service { +func NewService(taskMgr *task.Manager, sw *ioswitch.Switch) *Service { return &Service{ taskManager: taskMgr, + sw: sw, } } diff --git a/agent/internal/services/mq/storage.go b/agent/internal/services/mq/storage.go index 1941106..941a5e2 100644 --- a/agent/internal/services/mq/storage.go +++ b/agent/internal/services/mq/storage.go @@ -69,7 +69,7 @@ func (svc *Service) WaitStorageLoadPackage(msg *agtmq.WaitStorageLoadPackage) (* return mq.ReplyOK(agtmq.NewWaitStorageLoadPackageResp(true, errMsg, loadTsk.FullPath)) } else { - if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { errMsg := "" if tsk.Error() != nil { @@ -227,7 +227,7 @@ func (svc *Service) WaitStorageCreatePackage(msg *agtmq.WaitStorageCreatePackage if msg.WaitTimeoutMs == 0 { tsk.Wait() - } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { + } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { return mq.ReplyOK(agtmq.NewWaitStorageCreatePackageResp(false, "", 0)) } diff --git a/agent/internal/task/execute_io_plan.go b/agent/internal/task/execute_io_plan.go new file mode 100644 index 0000000..06f3ed7 --- /dev/null +++ b/agent/internal/task/execute_io_plan.go @@ -0,0 +1,45 @@ +package task + +import ( + "fmt" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +// TODO 临时使用Task来等待Plan执行进度 +type ExecuteIOPlan struct { + PlanID ioswitch.PlanID + Result ioswitch.PlanResult +} + +func NewExecuteIOPlan(planID ioswitch.PlanID) *ExecuteIOPlan { + return &ExecuteIOPlan{ + PlanID: planID, + } +} + +func (t *ExecuteIOPlan) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { + log := logger.WithType[ExecuteIOPlan]("Task") + log.Debugf("begin with %v", logger.FormatStruct(t)) + defer log.Debugf("end") + + ret, err := ctx.sw.ExecutePlan(t.PlanID) + if err != nil { + err := fmt.Errorf("executing io plan: %w", err) + log.WithField("PlanID", t.PlanID).Warn(err.Error()) + + complete(err, CompleteOption{ + RemovingDelay: time.Minute, + }) + return + } + + t.Result = ret + + complete(nil, CompleteOption{ + RemovingDelay: time.Minute, + }) +} diff --git a/agent/internal/task/task.go b/agent/internal/task/task.go index 7f0a27a..f41112c 100644 --- a/agent/internal/task/task.go +++ b/agent/internal/task/task.go @@ -3,10 +3,12 @@ package task import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/task" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" ) type TaskContext struct { distlock *distlock.Service + sw *ioswitch.Switch } // 需要在Task结束后主动调用,completing函数将在Manager加锁期间被调用, @@ -21,8 +23,9 @@ type Task = task.Task[TaskContext] type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service) Manager { +func NewManager(distlock *distlock.Service, sw *ioswitch.Switch) Manager { return task.NewManager(TaskContext{ distlock: distlock, + sw: sw, }) } diff --git a/agent/main.go b/agent/main.go index 29ba784..28e1a8c 100644 --- a/agent/main.go +++ b/agent/main.go @@ -13,6 +13,10 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + + // TODO 注册OpUnion,但在mq包中注册会造成循环依赖,所以只能放到这里 + _ "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" "google.golang.org/grpc" @@ -55,15 +59,17 @@ func main() { log.Fatalf("new ipfs failed, err: %s", err.Error()) } + sw := ioswitch.NewSwitch() + //处置协调端、客户端命令(可多建几个) wg := sync.WaitGroup{} wg.Add(5) - taskMgr := task.NewManager(distlock) + taskMgr := task.NewManager(distlock, &sw) // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, &sw), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -83,7 +89,7 @@ func main() { } s := grpc.NewServer() - agtrpc.RegisterAgentServer(s, grpcsvc.NewService()) + agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw)) go serveGRPC(s, lis, &wg) go serveDistLock(distlock) diff --git a/common/models/models.go b/common/models/models.go index 5ba1a08..967c45a 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -31,10 +31,10 @@ func NewRedundancyEcData(ec EC, blocks []ObjectBlockData) ECRedundancyData { } type EC struct { - ID int `json:"id"` - Name string `json:"name"` - EcK int `json:"ecK"` - EcN int `json:"ecN"` + ID int64 `json:"id"` + K int `json:"k"` + N int `json:"n"` + ChunkSize int `json:"chunkSize"` } type ObjectBlockData struct { @@ -51,12 +51,12 @@ func NewObjectBlockData(index int, fileHash string, nodeIDs []int64) ObjectBlock } } -func NewEc(id int, name string, ecK int, ecN int) EC { +func NewEc(id int64, k int, n int, chunkSize int) EC { return EC{ - ID: id, - Name: name, - EcK: ecK, - EcN: ecN, + ID: id, + K: k, + N: n, + ChunkSize: chunkSize, } } diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/agent/agent.pb.go index d90d461..c5f933b 100644 --- a/common/pkgs/grpc/agent/agent.pb.go +++ b/common/pkgs/grpc/agent/agent.pb.go @@ -22,49 +22,52 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type FileDataPacketType int32 +type StreamDataPacketType int32 const ( - FileDataPacketType_Data FileDataPacketType = 0 - FileDataPacketType_EOF FileDataPacketType = 1 + StreamDataPacketType_EOF StreamDataPacketType = 0 + StreamDataPacketType_Data StreamDataPacketType = 1 + StreamDataPacketType_SendArgs StreamDataPacketType = 2 ) -// Enum value maps for FileDataPacketType. +// Enum value maps for StreamDataPacketType. var ( - FileDataPacketType_name = map[int32]string{ - 0: "Data", - 1: "EOF", + StreamDataPacketType_name = map[int32]string{ + 0: "EOF", + 1: "Data", + 2: "SendArgs", } - FileDataPacketType_value = map[string]int32{ - "Data": 0, - "EOF": 1, + StreamDataPacketType_value = map[string]int32{ + "EOF": 0, + "Data": 1, + "SendArgs": 2, } ) -func (x FileDataPacketType) Enum() *FileDataPacketType { - p := new(FileDataPacketType) +func (x StreamDataPacketType) Enum() *StreamDataPacketType { + p := new(StreamDataPacketType) *p = x return p } -func (x FileDataPacketType) String() string { +func (x StreamDataPacketType) String() string { return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) } -func (FileDataPacketType) Descriptor() protoreflect.EnumDescriptor { +func (StreamDataPacketType) Descriptor() protoreflect.EnumDescriptor { return file_pkgs_grpc_agent_agent_proto_enumTypes[0].Descriptor() } -func (FileDataPacketType) Type() protoreflect.EnumType { +func (StreamDataPacketType) Type() protoreflect.EnumType { return &file_pkgs_grpc_agent_agent_proto_enumTypes[0] } -func (x FileDataPacketType) Number() protoreflect.EnumNumber { +func (x StreamDataPacketType) Number() protoreflect.EnumNumber { return protoreflect.EnumNumber(x) } -// Deprecated: Use FileDataPacketType.Descriptor instead. -func (FileDataPacketType) EnumDescriptor() ([]byte, []int) { +// Deprecated: Use StreamDataPacketType.Descriptor instead. +func (StreamDataPacketType) EnumDescriptor() ([]byte, []int) { return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} } @@ -74,8 +77,8 @@ type FileDataPacket struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type FileDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=FileDataPacketType" json:"Type,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` + Type StreamDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=StreamDataPacketType" json:"Type,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` } func (x *FileDataPacket) Reset() { @@ -110,11 +113,11 @@ func (*FileDataPacket) Descriptor() ([]byte, []int) { return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{0} } -func (x *FileDataPacket) GetType() FileDataPacketType { +func (x *FileDataPacket) GetType() StreamDataPacketType { if x != nil { return x.Type } - return FileDataPacketType_Data + return StreamDataPacketType_EOF } func (x *FileDataPacket) GetData() []byte { @@ -218,31 +221,218 @@ func (x *GetIPFSFileReq) GetFileHash() string { return "" } +// 注:EOF时data也可能有数据 +type StreamDataPacket struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type StreamDataPacketType `protobuf:"varint,1,opt,name=Type,proto3,enum=StreamDataPacketType" json:"Type,omitempty"` + PlanID string `protobuf:"bytes,2,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + StreamID string `protobuf:"bytes,3,opt,name=StreamID,proto3" json:"StreamID,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=Data,proto3" json:"Data,omitempty"` +} + +func (x *StreamDataPacket) Reset() { + *x = StreamDataPacket{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamDataPacket) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamDataPacket) ProtoMessage() {} + +func (x *StreamDataPacket) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamDataPacket.ProtoReflect.Descriptor instead. +func (*StreamDataPacket) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{3} +} + +func (x *StreamDataPacket) GetType() StreamDataPacketType { + if x != nil { + return x.Type + } + return StreamDataPacketType_EOF +} + +func (x *StreamDataPacket) GetPlanID() string { + if x != nil { + return x.PlanID + } + return "" +} + +func (x *StreamDataPacket) GetStreamID() string { + if x != nil { + return x.StreamID + } + return "" +} + +func (x *StreamDataPacket) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type SendStreamResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SendStreamResp) Reset() { + *x = SendStreamResp{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SendStreamResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SendStreamResp) ProtoMessage() {} + +func (x *SendStreamResp) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SendStreamResp.ProtoReflect.Descriptor instead. +func (*SendStreamResp) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{4} +} + +type FetchStreamReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + PlanID string `protobuf:"bytes,1,opt,name=PlanID,proto3" json:"PlanID,omitempty"` + StreamID string `protobuf:"bytes,2,opt,name=StreamID,proto3" json:"StreamID,omitempty"` +} + +func (x *FetchStreamReq) Reset() { + *x = FetchStreamReq{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FetchStreamReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FetchStreamReq) ProtoMessage() {} + +func (x *FetchStreamReq) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_grpc_agent_agent_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FetchStreamReq.ProtoReflect.Descriptor instead. +func (*FetchStreamReq) Descriptor() ([]byte, []int) { + return file_pkgs_grpc_agent_agent_proto_rawDescGZIP(), []int{5} +} + +func (x *FetchStreamReq) GetPlanID() string { + if x != nil { + return x.PlanID + } + return "" +} + +func (x *FetchStreamReq) GetStreamID() string { + if x != nil { + return x.StreamID + } + return "" +} + var File_pkgs_grpc_agent_agent_proto protoreflect.FileDescriptor var file_pkgs_grpc_agent_agent_proto_rawDesc = []byte{ 0x0a, 0x1b, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4d, 0x0a, + 0x74, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x4f, 0x0a, 0x0e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x12, - 0x27, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x13, 0x2e, - 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, - 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2e, 0x0a, 0x10, - 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x29, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, + 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x22, 0x2e, + 0x0a, 0x10, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2c, + 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1a, 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x2c, 0x0a, 0x0e, - 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x12, 0x1a, - 0x0a, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x2a, 0x27, 0x0a, 0x12, 0x46, 0x69, - 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, - 0x12, 0x08, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, - 0x46, 0x10, 0x01, 0x32, 0x74, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x36, 0x0a, 0x0c, - 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x46, - 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x11, 0x2e, - 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, - 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, - 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, + 0x28, 0x09, 0x52, 0x08, 0x46, 0x69, 0x6c, 0x65, 0x48, 0x61, 0x73, 0x68, 0x22, 0x85, 0x01, 0x0a, + 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, + 0x74, 0x12, 0x29, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, + 0x15, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, + 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x16, 0x0a, 0x06, + 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, + 0x61, 0x6e, 0x49, 0x44, 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, + 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, + 0x44, 0x61, 0x74, 0x61, 0x22, 0x10, 0x0a, 0x0e, 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x44, 0x0a, 0x0e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x50, 0x6c, 0x61, 0x6e, + 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x50, 0x6c, 0x61, 0x6e, 0x49, 0x44, + 0x12, 0x1a, 0x0a, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x44, 0x2a, 0x37, 0x0a, 0x14, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x10, 0x00, 0x12, 0x08, 0x0a, + 0x04, 0x44, 0x61, 0x74, 0x61, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x53, 0x65, 0x6e, 0x64, 0x41, + 0x72, 0x67, 0x73, 0x10, 0x02, 0x32, 0xe1, 0x01, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, + 0x36, 0x0a, 0x0c, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, + 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x1a, 0x11, 0x2e, 0x53, 0x65, 0x6e, 0x64, 0x49, 0x50, 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x22, 0x00, 0x28, 0x01, 0x12, 0x33, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x49, 0x50, + 0x46, 0x53, 0x46, 0x69, 0x6c, 0x65, 0x12, 0x0f, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x50, 0x46, 0x53, + 0x46, 0x69, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x44, 0x61, + 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x12, 0x34, 0x0a, 0x0a, + 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x11, 0x2e, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x0f, 0x2e, + 0x53, 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x22, 0x00, + 0x28, 0x01, 0x12, 0x35, 0x0a, 0x0b, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x12, 0x0f, 0x2e, 0x46, 0x65, 0x74, 0x63, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, + 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x44, 0x61, 0x74, 0x61, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } @@ -260,24 +450,32 @@ func file_pkgs_grpc_agent_agent_proto_rawDescGZIP() []byte { } var file_pkgs_grpc_agent_agent_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_pkgs_grpc_agent_agent_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_pkgs_grpc_agent_agent_proto_goTypes = []interface{}{ - (FileDataPacketType)(0), // 0: FileDataPacketType - (*FileDataPacket)(nil), // 1: FileDataPacket - (*SendIPFSFileResp)(nil), // 2: SendIPFSFileResp - (*GetIPFSFileReq)(nil), // 3: GetIPFSFileReq + (StreamDataPacketType)(0), // 0: StreamDataPacketType + (*FileDataPacket)(nil), // 1: FileDataPacket + (*SendIPFSFileResp)(nil), // 2: SendIPFSFileResp + (*GetIPFSFileReq)(nil), // 3: GetIPFSFileReq + (*StreamDataPacket)(nil), // 4: StreamDataPacket + (*SendStreamResp)(nil), // 5: SendStreamResp + (*FetchStreamReq)(nil), // 6: FetchStreamReq } var file_pkgs_grpc_agent_agent_proto_depIdxs = []int32{ - 0, // 0: FileDataPacket.Type:type_name -> FileDataPacketType - 1, // 1: Agent.SendIPFSFile:input_type -> FileDataPacket - 3, // 2: Agent.GetIPFSFile:input_type -> GetIPFSFileReq - 2, // 3: Agent.SendIPFSFile:output_type -> SendIPFSFileResp - 1, // 4: Agent.GetIPFSFile:output_type -> FileDataPacket - 3, // [3:5] is the sub-list for method output_type - 1, // [1:3] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 0, // 0: FileDataPacket.Type:type_name -> StreamDataPacketType + 0, // 1: StreamDataPacket.Type:type_name -> StreamDataPacketType + 1, // 2: Agent.SendIPFSFile:input_type -> FileDataPacket + 3, // 3: Agent.GetIPFSFile:input_type -> GetIPFSFileReq + 4, // 4: Agent.SendStream:input_type -> StreamDataPacket + 6, // 5: Agent.FetchStream:input_type -> FetchStreamReq + 2, // 6: Agent.SendIPFSFile:output_type -> SendIPFSFileResp + 1, // 7: Agent.GetIPFSFile:output_type -> FileDataPacket + 5, // 8: Agent.SendStream:output_type -> SendStreamResp + 4, // 9: Agent.FetchStream:output_type -> StreamDataPacket + 6, // [6:10] is the sub-list for method output_type + 2, // [2:6] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_pkgs_grpc_agent_agent_proto_init() } @@ -322,6 +520,42 @@ func file_pkgs_grpc_agent_agent_proto_init() { return nil } } + file_pkgs_grpc_agent_agent_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamDataPacket); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SendStreamResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_grpc_agent_agent_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FetchStreamReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -329,7 +563,7 @@ func file_pkgs_grpc_agent_agent_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkgs_grpc_agent_agent_proto_rawDesc, NumEnums: 1, - NumMessages: 3, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/agent/agent.proto index d8f53b4..f756fec 100644 --- a/common/pkgs/grpc/agent/agent.proto +++ b/common/pkgs/grpc/agent/agent.proto @@ -5,13 +5,14 @@ syntax = "proto3"; option go_package = ".;agent";//grpc这里生效了 -enum FileDataPacketType { - Data = 0; - EOF = 1; +enum StreamDataPacketType { + EOF = 0; + Data = 1; + SendArgs = 2; } // 文件数据。注意:只在Type为Data的时候,Data字段才能有数据 message FileDataPacket { - FileDataPacketType Type = 1; + StreamDataPacketType Type = 1; bytes Data = 2; } @@ -23,8 +24,27 @@ message GetIPFSFileReq { string FileHash = 1; } +// 注:EOF时data也可能有数据 +message StreamDataPacket { + StreamDataPacketType Type = 1; + string PlanID = 2; + string StreamID = 3; + bytes Data = 4; +} + +message SendStreamResp { +} + +message FetchStreamReq { + string PlanID = 1; + string StreamID = 2; +} + service Agent { rpc SendIPFSFile(stream FileDataPacket)returns(SendIPFSFileResp){} rpc GetIPFSFile(GetIPFSFileReq)returns(stream FileDataPacket){} + + rpc SendStream(stream StreamDataPacket)returns(SendStreamResp){} + rpc FetchStream(FetchStreamReq)returns(stream StreamDataPacket){} } diff --git a/common/pkgs/grpc/agent/agent_grpc.pb.go b/common/pkgs/grpc/agent/agent_grpc.pb.go index d32adde..95b2f92 100644 --- a/common/pkgs/grpc/agent/agent_grpc.pb.go +++ b/common/pkgs/grpc/agent/agent_grpc.pb.go @@ -23,6 +23,8 @@ const _ = grpc.SupportPackageIsVersion7 const ( Agent_SendIPFSFile_FullMethodName = "/Agent/SendIPFSFile" Agent_GetIPFSFile_FullMethodName = "/Agent/GetIPFSFile" + Agent_SendStream_FullMethodName = "/Agent/SendStream" + Agent_FetchStream_FullMethodName = "/Agent/FetchStream" ) // AgentClient is the client API for Agent service. @@ -31,6 +33,8 @@ const ( type AgentClient interface { SendIPFSFile(ctx context.Context, opts ...grpc.CallOption) (Agent_SendIPFSFileClient, error) GetIPFSFile(ctx context.Context, in *GetIPFSFileReq, opts ...grpc.CallOption) (Agent_GetIPFSFileClient, error) + SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) + FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error) } type agentClient struct { @@ -107,12 +111,80 @@ func (x *agentGetIPFSFileClient) Recv() (*FileDataPacket, error) { return m, nil } +func (c *agentClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (Agent_SendStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[2], Agent_SendStream_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &agentSendStreamClient{stream} + return x, nil +} + +type Agent_SendStreamClient interface { + Send(*StreamDataPacket) error + CloseAndRecv() (*SendStreamResp, error) + grpc.ClientStream +} + +type agentSendStreamClient struct { + grpc.ClientStream +} + +func (x *agentSendStreamClient) Send(m *StreamDataPacket) error { + return x.ClientStream.SendMsg(m) +} + +func (x *agentSendStreamClient) CloseAndRecv() (*SendStreamResp, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(SendStreamResp) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *agentClient) FetchStream(ctx context.Context, in *FetchStreamReq, opts ...grpc.CallOption) (Agent_FetchStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &Agent_ServiceDesc.Streams[3], Agent_FetchStream_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &agentFetchStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Agent_FetchStreamClient interface { + Recv() (*StreamDataPacket, error) + grpc.ClientStream +} + +type agentFetchStreamClient struct { + grpc.ClientStream +} + +func (x *agentFetchStreamClient) Recv() (*StreamDataPacket, error) { + m := new(StreamDataPacket) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // AgentServer is the server API for Agent service. // All implementations must embed UnimplementedAgentServer // for forward compatibility type AgentServer interface { SendIPFSFile(Agent_SendIPFSFileServer) error GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error + SendStream(Agent_SendStreamServer) error + FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error mustEmbedUnimplementedAgentServer() } @@ -126,6 +198,12 @@ func (UnimplementedAgentServer) SendIPFSFile(Agent_SendIPFSFileServer) error { func (UnimplementedAgentServer) GetIPFSFile(*GetIPFSFileReq, Agent_GetIPFSFileServer) error { return status.Errorf(codes.Unimplemented, "method GetIPFSFile not implemented") } +func (UnimplementedAgentServer) SendStream(Agent_SendStreamServer) error { + return status.Errorf(codes.Unimplemented, "method SendStream not implemented") +} +func (UnimplementedAgentServer) FetchStream(*FetchStreamReq, Agent_FetchStreamServer) error { + return status.Errorf(codes.Unimplemented, "method FetchStream not implemented") +} func (UnimplementedAgentServer) mustEmbedUnimplementedAgentServer() {} // UnsafeAgentServer may be embedded to opt out of forward compatibility for this service. @@ -186,6 +264,53 @@ func (x *agentGetIPFSFileServer) Send(m *FileDataPacket) error { return x.ServerStream.SendMsg(m) } +func _Agent_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(AgentServer).SendStream(&agentSendStreamServer{stream}) +} + +type Agent_SendStreamServer interface { + SendAndClose(*SendStreamResp) error + Recv() (*StreamDataPacket, error) + grpc.ServerStream +} + +type agentSendStreamServer struct { + grpc.ServerStream +} + +func (x *agentSendStreamServer) SendAndClose(m *SendStreamResp) error { + return x.ServerStream.SendMsg(m) +} + +func (x *agentSendStreamServer) Recv() (*StreamDataPacket, error) { + m := new(StreamDataPacket) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Agent_FetchStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(FetchStreamReq) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AgentServer).FetchStream(m, &agentFetchStreamServer{stream}) +} + +type Agent_FetchStreamServer interface { + Send(*StreamDataPacket) error + grpc.ServerStream +} + +type agentFetchStreamServer struct { + grpc.ServerStream +} + +func (x *agentFetchStreamServer) Send(m *StreamDataPacket) error { + return x.ServerStream.SendMsg(m) +} + // Agent_ServiceDesc is the grpc.ServiceDesc for Agent service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -204,6 +329,16 @@ var Agent_ServiceDesc = grpc.ServiceDesc{ Handler: _Agent_GetIPFSFile_Handler, ServerStreams: true, }, + { + StreamName: "SendStream", + Handler: _Agent_SendStream_Handler, + ClientStreams: true, + }, + { + StreamName: "FetchStream", + Handler: _Agent_FetchStream_Handler, + ServerStreams: true, + }, }, Metadata: "pkgs/grpc/agent/agent.proto", } diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/agent/client.go index e23a6ee..440e49a 100644 --- a/common/pkgs/grpc/agent/client.go +++ b/common/pkgs/grpc/agent/client.go @@ -5,6 +5,7 @@ import ( "fmt" "io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -37,7 +38,7 @@ func (c *Client) SendIPFSFile(file io.Reader) (string, error) { rd, err := file.Read(buf) if err == io.EOF { err := sendCli.Send(&FileDataPacket{ - Type: FileDataPacketType_EOF, + Type: StreamDataPacketType_EOF, Data: buf[:rd], }) if err != nil { @@ -57,7 +58,7 @@ func (c *Client) SendIPFSFile(file io.Reader) (string, error) { } err = sendCli.Send(&FileDataPacket{ - Type: FileDataPacketType_Data, + Type: StreamDataPacketType_Data, Data: buf[:rd], }) if err != nil { @@ -68,7 +69,9 @@ func (c *Client) SendIPFSFile(file io.Reader) (string, error) { type fileReadCloser struct { io.ReadCloser - stream Agent_GetIPFSFileClient + // stream Agent_GetIPFSFileClient + // TODO 临时使用 + recvFn func() (*StreamDataPacket, error) cancelFn context.CancelFunc readingData []byte recvEOF bool @@ -76,15 +79,15 @@ type fileReadCloser struct { func (s *fileReadCloser) Read(p []byte) (int, error) { if len(s.readingData) == 0 && !s.recvEOF { - resp, err := s.stream.Recv() + resp, err := s.recvFn() if err != nil { return 0, err } - if resp.Type == FileDataPacketType_Data { + if resp.Type == StreamDataPacketType_Data { s.readingData = resp.Data - } else if resp.Type == FileDataPacketType_EOF { + } else if resp.Type == StreamDataPacketType_EOF { s.readingData = resp.Data s.recvEOF = true @@ -121,7 +124,87 @@ func (c *Client) GetIPFSFile(fileHash string) (io.ReadCloser, error) { } return &fileReadCloser{ - stream: stream, + // TODO 临时处理方案 + recvFn: func() (*StreamDataPacket, error) { + pkt, err := stream.Recv() + if err != nil { + return nil, err + } + + return &StreamDataPacket{ + Type: pkt.Type, + Data: pkt.Data, + }, nil + }, + cancelFn: cancel, + }, nil +} + +func (c *Client) SendStream(planID ioswitch.PlanID, streamID ioswitch.StreamID, file io.Reader) error { + sendCli, err := c.cli.SendStream(context.Background()) + if err != nil { + return err + } + + err = sendCli.Send(&StreamDataPacket{ + Type: StreamDataPacketType_SendArgs, + PlanID: string(planID), + StreamID: string(streamID), + }) + if err != nil { + return fmt.Errorf("sending stream id packet: %w", err) + } + + buf := make([]byte, 4096) + for { + rd, err := file.Read(buf) + if err == io.EOF { + err := sendCli.Send(&StreamDataPacket{ + Type: StreamDataPacketType_EOF, + StreamID: string(streamID), + Data: buf[:rd], + }) + if err != nil { + return fmt.Errorf("sending EOF packet: %w", err) + } + + _, err = sendCli.CloseAndRecv() + if err != nil { + return fmt.Errorf("receiving response: %w", err) + } + + return nil + } + + if err != nil { + return fmt.Errorf("reading file data: %w", err) + } + + err = sendCli.Send(&StreamDataPacket{ + Type: StreamDataPacketType_Data, + StreamID: string(streamID), + Data: buf[:rd], + }) + if err != nil { + return fmt.Errorf("sending data packet: %w", err) + } + } +} + +func (c *Client) FetchStream(planID ioswitch.PlanID, streamID ioswitch.StreamID) (io.ReadCloser, error) { + ctx, cancel := context.WithCancel(context.Background()) + + stream, err := c.cli.FetchStream(ctx, &FetchStreamReq{ + PlanID: string(planID), + StreamID: string(streamID), + }) + if err != nil { + cancel() + return nil, fmt.Errorf("request grpc failed, err: %w", err) + } + + return &fileReadCloser{ + recvFn: stream.Recv, cancelFn: cancel, }, nil } diff --git a/common/pkgs/ioswitch/ioswitch.go b/common/pkgs/ioswitch/ioswitch.go new file mode 100644 index 0000000..1c8fc34 --- /dev/null +++ b/common/pkgs/ioswitch/ioswitch.go @@ -0,0 +1,35 @@ +package ioswitch + +import ( + "io" +) + +type PlanID string + +type StreamID string + +type Plan struct { + ID PlanID + Ops []Op +} + +type Stream struct { + ID StreamID + Stream io.ReadCloser +} + +func NewStream(id StreamID, stream io.ReadCloser) Stream { + return Stream{ + ID: id, + Stream: stream, + } +} + +type Op interface { + Execute(sw *Switch, planID PlanID) error +} + +type ResultKV struct { + Key string + Value any +} diff --git a/common/pkgs/ioswitch/ops/ops.go b/common/pkgs/ioswitch/ops/ops.go new file mode 100644 index 0000000..d6b8717 --- /dev/null +++ b/common/pkgs/ioswitch/ops/ops.go @@ -0,0 +1,198 @@ +package ops + +import ( + "context" + "fmt" + "io" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/types" + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/common/utils/serder" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[ioswitch.Op]( + (*IPFSRead)(nil), + (*IPFSWrite)(nil), + (*GRPCSend)(nil), + (*GRPCFetch)(nil), + (*ECCompute)(nil), + (*Combine)(nil), +))) + +type IPFSRead struct { + Output ioswitch.StreamID `json:"output"` + FileHash string `json:"fileHash"` +} + +func (o *IPFSRead) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("FileHash", o.FileHash). + WithField("Output", o.Output). + Debugf("ipfs read op") + defer logger.Debugf("ipfs read op finished") + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + file, err := ipfsCli.OpenRead(o.FileHash) + if err != nil { + return fmt.Errorf("reading ipfs: %w", err) + } + + fut := future.NewSetVoid() + file = myio.AfterReadClosed(file, func(closer io.ReadCloser) { + fut.SetVoid() + }) + + sw.StreamReady(planID, ioswitch.NewStream(o.Output, file)) + + // TODO context + fut.Wait(context.TODO()) + return nil +} + +type IPFSWrite struct { + Input ioswitch.StreamID `json:"input"` + ResultKey string `json:"resultKey"` +} + +func (o *IPFSWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("ResultKey", o.ResultKey). + WithField("Input", o.Input). + Debugf("ipfs write op") + + ipfsCli, err := stgglb.IPFSPool.Acquire() + if err != nil { + return fmt.Errorf("new ipfs client: %w", err) + } + defer stgglb.IPFSPool.Release(ipfsCli) + + strs, err := sw.WaitStreams(planID, o.Input) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + fileHash, err := ipfsCli.CreateFile(strs[0].Stream) + if err != nil { + return fmt.Errorf("creating ipfs file: %w", err) + } + + if o.ResultKey != "" { + sw.AddResultValue(planID, ioswitch.ResultKV{ + Key: o.ResultKey, + Value: fileHash, + }) + } + + return nil +} + +type GRPCSend struct { + StreamID ioswitch.StreamID `json:"streamID"` + Node model.Node `json:"node"` +} + +func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + logger. + WithField("ioswitch.StreamID", o.StreamID). + Debugf("grpc send") + + strs, err := sw.WaitStreams(planID, o.StreamID) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + // TODO 根据客户端地址选择IP和端口 + agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + err = agtCli.SendStream(planID, o.StreamID, strs[0].Stream) + if err != nil { + return fmt.Errorf("sending stream: %w", err) + } + + return nil +} + +type GRPCFetch struct { + StreamID ioswitch.StreamID `json:"streamID"` + Node model.Node `json:"node"` +} + +func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + // TODO 根据客户端地址选择IP和端口 + agtCli, err := stgglb.AgentRPCPool.Acquire(o.Node.ExternalIP, o.Node.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + str, err := agtCli.FetchStream(planID, o.StreamID) + if err != nil { + return fmt.Errorf("fetching stream: %w", err) + } + + fut := future.NewSetVoid() + str = myio.AfterReadClosed(str, func(closer io.ReadCloser) { + fut.SetVoid() + }) + + sw.StreamReady(planID, ioswitch.NewStream(o.StreamID, str)) + + // TODO + fut.Wait(context.TODO()) + + return err +} + +type ECCompute struct { + EC stgmod.EC `json:"ec"` + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` + InputBlockIndexes []int `json:"inputBlockIndexes"` + OutputBlockIndexes []int `json:"outputBlockIndexes"` +} + +func (o *ECCompute) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + // TODO2 + return nil +} + +type Combine struct { + InputIDs []ioswitch.StreamID `json:"inputIDs"` + OutputID ioswitch.StreamID `json:"outputID"` +} + +func (o *Combine) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + strs, err := sw.WaitStreams(planID, o.InputIDs...) + if err != nil { + return err + } + + pr, pw := io.Pipe() + sw.StreamReady(planID, ioswitch.NewStream(o.OutputID, pr)) + + for _, str := range strs { + _, err := io.Copy(pw, str.Stream) + if err != nil { + return err + } + } + + return nil +} diff --git a/common/pkgs/ioswitch/plans/executor.go b/common/pkgs/ioswitch/plans/executor.go new file mode 100644 index 0000000..6facacf --- /dev/null +++ b/common/pkgs/ioswitch/plans/executor.go @@ -0,0 +1,172 @@ +package plans + +import ( + "errors" + "fmt" + "io" + "sync" + "sync/atomic" + + "gitlink.org.cn/cloudream/common/pkgs/future" + myio "gitlink.org.cn/cloudream/common/utils/io" + stgglb "gitlink.org.cn/cloudream/storage/common/globals" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" +) + +type ExecutorResult struct { + ResultValues map[string]any +} + +type Executor struct { + plan ComposedPlan + callback *future.SetValueFuture[ExecutorResult] + mqClis []*agtmq.Client + planTaskIDs []string +} + +func Execute(plan ComposedPlan) (*Executor, error) { + executor := Executor{ + plan: plan, + callback: future.NewSetValue[ExecutorResult](), + } + + var err error + for _, a := range plan.AgentPlans { + var cli *agtmq.Client + cli, err = stgglb.AgentMQPool.Acquire(a.Node.NodeID) + if err != nil { + executor.Close() + return nil, fmt.Errorf("new mq client for %d: %w", a.Node.NodeID, err) + } + + executor.mqClis = append(executor.mqClis, cli) + } + + for i, a := range plan.AgentPlans { + cli := executor.mqClis[i] + + _, err := cli.SetupIOPlan(agtmq.NewSetupIOPlan(a.Plan)) + if err != nil { + for i -= 1; i >= 0; i-- { + executor.mqClis[i].CancelIOPlan(agtmq.NewCancelIOPlan(plan.ID)) + } + executor.Close() + return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err) + } + } + + for i, a := range plan.AgentPlans { + cli := executor.mqClis[i] + + resp, err := cli.StartIOPlan(agtmq.NewStartIOPlan(a.Plan.ID)) + if err != nil { + executor.cancelAll() + executor.Close() + return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err) + } + + executor.planTaskIDs = append(executor.planTaskIDs, resp.TaskID) + } + + go executor.pollResult() + + return &executor, nil +} + +func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error { + // TODO 根据地域选择IP + agtCli, err := stgglb.AgentRPCPool.Acquire(info.toNode.ExternalIP, info.toNode.ExternalGRPCPort) + if err != nil { + return fmt.Errorf("new agent rpc client: %w", err) + } + defer stgglb.AgentRPCPool.Release(agtCli) + + return agtCli.SendStream(e.plan.ID, info.info.ID, stream) +} + +func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) { + // TODO 根据地域选择IP + agtCli, err := stgglb.AgentRPCPool.Acquire(info.fromNode.ExternalIP, info.fromNode.ExternalGRPCPort) + if err != nil { + return nil, fmt.Errorf("new agent rpc client: %w", err) + } + + str, err := agtCli.FetchStream(e.plan.ID, info.info.ID) + if err != nil { + return nil, err + } + + return myio.AfterReadClosed(str, func(closer io.ReadCloser) { + stgglb.AgentRPCPool.Release(agtCli) + }), nil +} + +func (e *Executor) cancelAll() { + for _, cli := range e.mqClis { + cli.CancelIOPlan(agtmq.NewCancelIOPlan(e.plan.ID)) + } +} + +func (e *Executor) Close() { + for _, c := range e.mqClis { + stgglb.AgentMQPool.Release(c) + } +} + +func (e *Executor) pollResult() { + wg := sync.WaitGroup{} + anyErr := atomic.Value{} + anyErr.Store(nil) + rets := make([]*ioswitch.PlanResult, len(e.plan.AgentPlans)) + + for i, id := range e.planTaskIDs { + idx := i + taskID := id + + wg.Add(1) + go func() { + defer wg.Done() + + for { + resp, err := e.mqClis[idx].WaitIOPlan(agtmq.NewWaitIOPlan(taskID, 5000)) + if err != nil { + anyErr.Store(err) + break + } + + if resp.IsComplete { + if resp.Error != "" { + anyErr.Store(errors.New(resp.Error)) + } else { + rets[idx] = &resp.Result + } + break + } + + if anyErr.Load() != nil { + break + } + } + }() + } + + wg.Wait() + + err := anyErr.Load().(error) + if err != nil { + e.callback.SetError(err) + return + } + + reducedRet := ExecutorResult{ + ResultValues: make(map[string]any), + } + for _, ret := range rets { + for k, v := range ret.Values { + reducedRet.ResultValues[k] = v + } + } + + e.callback.SetValue(reducedRet) +} diff --git a/common/pkgs/ioswitch/plans/plan_builder.go b/common/pkgs/ioswitch/plans/plan_builder.go new file mode 100644 index 0000000..d831cd2 --- /dev/null +++ b/common/pkgs/ioswitch/plans/plan_builder.go @@ -0,0 +1,229 @@ +package plans + +import ( + "fmt" + + "github.com/google/uuid" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops" +) + +type StreamInfo struct { + ID ioswitch.StreamID +} + +type PlanBuilder struct { + streams []*StreamInfo + agentPlans map[int64]*AgentPlanBuilder +} + +func (b *PlanBuilder) Build() (*ComposedPlan, error) { + planID := uuid.NewString() + + var agentPlans []AgentPlan + for _, b := range b.agentPlans { + plan, err := b.Build(ioswitch.PlanID(planID)) + if err != nil { + return nil, err + } + + agentPlans = append(agentPlans, plan) + } + + return &ComposedPlan{ + ID: ioswitch.PlanID(planID), + AgentPlans: agentPlans, + }, nil +} + +func (b *PlanBuilder) newStream() *StreamInfo { + str := &StreamInfo{ + ID: ioswitch.StreamID(fmt.Sprintf("%d", len(b.streams)+1)), + } + + b.streams = append(b.streams, str) + + return str +} + +func NewPlanBuilder() PlanBuilder { + return PlanBuilder{ + agentPlans: make(map[int64]*AgentPlanBuilder), + } +} + +func (b *PlanBuilder) FromExecutor() *FromExecutorStream { + return &FromExecutorStream{ + owner: b, + info: b.newStream(), + } +} + +func (b *PlanBuilder) AtAgent(node model.Node) *AgentPlanBuilder { + agtPlan, ok := b.agentPlans[node.NodeID] + if !ok { + agtPlan = &AgentPlanBuilder{ + owner: b, + node: node, + } + b.agentPlans[node.NodeID] = agtPlan + } + + return agtPlan +} + +type FromExecutorStream struct { + owner *PlanBuilder + info *StreamInfo + toNode *model.Node +} + +func (s *FromExecutorStream) ToNode(node model.Node) *AgentStream { + s.toNode = &node + return &AgentStream{ + owner: s.owner.AtAgent(node), + info: s.owner.newStream(), + } +} + +type ToExecutorStream struct { + info *StreamInfo + fromNode *model.Node +} + +type AgentStream struct { + owner *AgentPlanBuilder + info *StreamInfo +} + +func (s *AgentStream) IPFSWrite(resultKey string) { + s.owner.ops = append(s.owner.ops, &ops.IPFSWrite{ + Input: s.info.ID, + ResultKey: resultKey, + }) +} + +func (s *AgentStream) GRPCSend(node model.Node) *AgentStream { + agtStr := &AgentStream{ + owner: s.owner.owner.AtAgent(node), + info: s.owner.owner.newStream(), + } + + s.owner.ops = append(s.owner.ops, &ops.GRPCSend{ + StreamID: s.info.ID, + Node: node, + }) + + return agtStr +} + +func (s *AgentStream) ToExecutor() *ToExecutorStream { + return &ToExecutorStream{ + info: s.info, + fromNode: &s.owner.node, + } +} + +type AgentPlanBuilder struct { + owner *PlanBuilder + node model.Node + ops []ioswitch.Op +} + +func (b *AgentPlanBuilder) GRCPFetch(node model.Node) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + b.ops = append(b.ops, &ops.GRPCFetch{ + StreamID: agtStr.info.ID, + Node: node, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) IPFSRead(fileHash string) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + b.ops = append(b.ops, &ops.IPFSRead{ + Output: agtStr.info.ID, + FileHash: fileHash, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) ECCompute(ec stgmod.EC, inBlockIndexes []int, outBlockIndexes []int, streams ...*AgentStream) *MultiStream { + mstr := &MultiStream{} + + var inputStrIDs []ioswitch.StreamID + for _, str := range streams { + inputStrIDs = append(inputStrIDs, str.info.ID) + } + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < ec.N-ec.K; i++ { + info := b.owner.newStream() + mstr.streams[i] = &AgentStream{ + owner: b, + info: info, + } + outputStrIDs = append(outputStrIDs, info.ID) + } + + b.ops = append(b.ops, &ops.ECCompute{ + EC: ec, + InputIDs: inputStrIDs, + OutputIDs: outputStrIDs, + InputBlockIndexes: inBlockIndexes, + OutputBlockIndexes: outBlockIndexes, + }) + + return mstr +} + +func (b *AgentPlanBuilder) Combine(streams ...*AgentStream) *AgentStream { + agtStr := &AgentStream{ + owner: b, + info: b.owner.newStream(), + } + + var inputStrIDs []ioswitch.StreamID + for _, str := range streams { + inputStrIDs = append(inputStrIDs, str.info.ID) + } + + b.ops = append(b.ops, &ops.Combine{ + InputIDs: inputStrIDs, + OutputID: agtStr.info.ID, + }) + + return agtStr +} + +func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) { + plan := ioswitch.Plan{ + ID: planID, + Ops: b.ops, + } + + return AgentPlan{ + Plan: plan, + Node: b.node, + }, nil +} + +type MultiStream struct { + streams []*AgentStream +} + +func (m *MultiStream) Stream(index int) *AgentStream { + return m.streams[index] +} diff --git a/common/pkgs/ioswitch/plans/plans.go b/common/pkgs/ioswitch/plans/plans.go new file mode 100644 index 0000000..3ea9e8e --- /dev/null +++ b/common/pkgs/ioswitch/plans/plans.go @@ -0,0 +1,16 @@ +package plans + +import ( + "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type AgentPlan struct { + Node model.Node + Plan ioswitch.Plan +} + +type ComposedPlan struct { + ID ioswitch.PlanID + AgentPlans []AgentPlan +} diff --git a/common/pkgs/ioswitch/switch.go b/common/pkgs/ioswitch/switch.go new file mode 100644 index 0000000..00f0bb3 --- /dev/null +++ b/common/pkgs/ioswitch/switch.go @@ -0,0 +1,291 @@ +package ioswitch + +import ( + "context" + "errors" + "fmt" + "sync" + + "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/lo" +) + +var ErrPlanFinished = errors.New("plan is finished") + +var ErrPlanNotFound = errors.New("plan not found") + +type OpState string + +const ( + OpPending OpState = "Pending" + OpFinished OpState = "Finished" +) + +type Oping struct { + State OpState +} + +type PlanResult struct { + Values map[string]any `json:"values"` +} + +type Planning struct { + plan Plan + opings []Oping + resultValues map[string]any + callback *future.SetValueFuture[PlanResult] + + readys map[StreamID]Stream + waittings []*Watting +} + +func NewPlanning(plan Plan) Planning { + planning := Planning{ + plan: plan, + callback: future.NewSetValue[PlanResult](), + readys: make(map[StreamID]Stream), + } + + for _ = range plan.Ops { + oping := Oping{ + State: OpPending, + } + planning.opings = append(planning.opings, oping) + } + + return planning +} + +func (p *Planning) IsCompleted() bool { + for _, oping := range p.opings { + if oping.State != OpFinished { + return false + } + } + + return true +} + +func (p *Planning) MakeResult() PlanResult { + return PlanResult{ + Values: p.resultValues, + } +} + +type Watting struct { + WaitIDs []StreamID + Readys []Stream + Callback *future.SetValueFuture[[]Stream] +} + +func (w *Watting) TryReady(str Stream) bool { + for i, id := range w.WaitIDs { + if id == str.ID { + w.Readys[i] = str + return true + } + } + + return false +} + +func (c *Watting) IsAllReady() bool { + for _, s := range c.Readys { + if s.Stream == nil { + return false + } + } + + return true +} + +func (w *Watting) Complete() { + w.Callback.SetValue(w.Readys) +} + +func (w *Watting) Cancel(err error) { + w.Callback.SetError(err) +} + +type Switch struct { + lock sync.Mutex + plannings map[PlanID]*Planning +} + +func NewSwitch() Switch { + return Switch{ + plannings: make(map[PlanID]*Planning), + } +} + +func (s *Switch) SetupPlan(plan Plan) error { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.plannings[plan.ID]; ok { + return fmt.Errorf("plan id exists") + } + + planning := NewPlanning(plan) + s.plannings[plan.ID] = &planning + return nil +} + +func (s *Switch) ExecutePlan(id PlanID) (PlanResult, error) { + s.lock.Lock() + defer s.lock.Unlock() + + planning, ok := s.plannings[id] + if !ok { + return PlanResult{}, fmt.Errorf("plan not found") + } + + for i, op := range planning.plan.Ops { + idx := i + o := op + go func() { + err := o.Execute(s, id) + + s.lock.Lock() + defer s.lock.Unlock() + + if err != nil { + logger.Std.Warnf("exeucting op: %s", err.Error()) + s.cancelPlan(id) + return + } + + planning.opings[idx].State = OpFinished + if planning.IsCompleted() { + s.completePlan(id) + } + }() + } + + return planning.callback.WaitValue(context.TODO()) +} + +func (s *Switch) CancelPlan(id PlanID) { + s.lock.Lock() + defer s.lock.Unlock() + + s.cancelPlan(id) +} + +func (s *Switch) cancelPlan(id PlanID) { + plan, ok := s.plannings[id] + if !ok { + return + } + + delete(s.plannings, id) + + for _, s := range plan.readys { + s.Stream.Close() + } + + for _, c := range plan.waittings { + c.Callback.SetError(ErrPlanFinished) + } + + plan.callback.SetError(fmt.Errorf("plan cancelled")) +} + +func (s *Switch) completePlan(id PlanID) { + plan, ok := s.plannings[id] + if !ok { + return + } + + delete(s.plannings, id) + + for _, s := range plan.readys { + s.Stream.Close() + } + + for _, c := range plan.waittings { + c.Callback.SetError(ErrPlanFinished) + } + + plan.callback.SetValue(plan.MakeResult()) +} + +func (s *Switch) StreamReady(planID PlanID, stream Stream) { + s.lock.Lock() + defer s.lock.Unlock() + + plan, ok := s.plannings[planID] + if !ok { + //TODO 处理错误 + return + } + + for i, wa := range plan.waittings { + if !wa.TryReady(stream) { + continue + } + + if !wa.IsAllReady() { + return + } + + plan.waittings = lo.RemoveAt(plan.waittings, i) + wa.Complete() + return + } + + plan.readys[stream.ID] = stream +} + +func (s *Switch) WaitStreams(planID PlanID, streamIDs ...StreamID) ([]Stream, error) { + + s.lock.Lock() + defer s.lock.Unlock() + + plan, ok := s.plannings[planID] + if !ok { + return nil, ErrPlanNotFound + } + + allReady := true + readys := make([]Stream, len(streamIDs)) + for i, id := range streamIDs { + str, ok := plan.readys[id] + if !ok { + allReady = false + continue + } + + readys[i] = str + delete(plan.readys, id) + } + + if allReady { + return readys, nil + } + + callback := future.NewSetValue[[]Stream]() + + plan.waittings = append(plan.waittings, &Watting{ + WaitIDs: streamIDs, + Readys: readys, + Callback: callback, + }) + + return callback.WaitValue(context.TODO()) +} + +func (s *Switch) AddResultValue(planID PlanID, rets ...ResultKV) { + s.lock.Lock() + defer s.lock.Unlock() + + plan, ok := s.plannings[planID] + if !ok { + return + } + + for _, ret := range rets { + plan.resultValues[ret.Key] = ret.Value + } +} diff --git a/common/pkgs/mq/agent/io.go b/common/pkgs/mq/agent/io.go new file mode 100644 index 0000000..4a79cd8 --- /dev/null +++ b/common/pkgs/mq/agent/io.go @@ -0,0 +1,120 @@ +package agent + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type IOService interface { + SetupIOPlan(msg *SetupIOPlan) (*SetupIOPlanResp, *mq.CodeMessage) + + StartIOPlan(msg *StartIOPlan) (*StartIOPlanResp, *mq.CodeMessage) + + WaitIOPlan(msg *WaitIOPlan) (*WaitIOPlanResp, *mq.CodeMessage) + + CancelIOPlan(msg *CancelIOPlan) (*CancelIOPlanResp, *mq.CodeMessage) +} + +// 设置io计划 +var _ = Register(Service.SetupIOPlan) + +type SetupIOPlan struct { + mq.MessageBodyBase + Plan ioswitch.Plan `json:"plan"` +} +type SetupIOPlanResp struct { + mq.MessageBodyBase +} + +func NewSetupIOPlan(plan ioswitch.Plan) *SetupIOPlan { + return &SetupIOPlan{ + Plan: plan, + } +} +func NewSetupIOPlanResp() *SetupIOPlanResp { + return &SetupIOPlanResp{} +} +func (client *Client) SetupIOPlan(msg *SetupIOPlan, opts ...mq.RequestOption) (*SetupIOPlanResp, error) { + return mq.Request(Service.SetupIOPlan, client.rabbitCli, msg, opts...) +} + +// 启动io计划 +var _ = Register(Service.StartIOPlan) + +type StartIOPlan struct { + mq.MessageBodyBase + PlanID ioswitch.PlanID `json:"planID"` +} +type StartIOPlanResp struct { + mq.MessageBodyBase + TaskID string `json:"taskID"` +} + +func NewStartIOPlan(planID ioswitch.PlanID) *StartIOPlan { + return &StartIOPlan{ + PlanID: planID, + } +} +func NewStartIOPlanResp(taskID string) *StartIOPlanResp { + return &StartIOPlanResp{ + TaskID: taskID, + } +} +func (client *Client) StartIOPlan(msg *StartIOPlan, opts ...mq.RequestOption) (*StartIOPlanResp, error) { + return mq.Request(Service.StartIOPlan, client.rabbitCli, msg, opts...) +} + +// 启动io计划 +var _ = Register(Service.WaitIOPlan) + +type WaitIOPlan struct { + mq.MessageBodyBase + TaskID string `json:"taskID"` + WaitTimeoutMs int64 `json:"waitTimeout"` +} +type WaitIOPlanResp struct { + mq.MessageBodyBase + IsComplete bool `json:"isComplete"` + Error string `json:"error"` + Result ioswitch.PlanResult `json:"result"` +} + +func NewWaitIOPlan(taskID string, waitTimeoutMs int64) *WaitIOPlan { + return &WaitIOPlan{ + TaskID: taskID, + WaitTimeoutMs: waitTimeoutMs, + } +} +func NewWaitIOPlanResp(isComplete bool, err string, result ioswitch.PlanResult) *WaitIOPlanResp { + return &WaitIOPlanResp{ + IsComplete: isComplete, + Error: err, + Result: result, + } +} +func (client *Client) WaitIOPlan(msg *WaitIOPlan, opts ...mq.RequestOption) (*WaitIOPlanResp, error) { + return mq.Request(Service.WaitIOPlan, client.rabbitCli, msg, opts...) +} + +// 取消io计划 +var _ = Register(Service.CancelIOPlan) + +type CancelIOPlan struct { + mq.MessageBodyBase + PlanID ioswitch.PlanID `json:"planID"` +} +type CancelIOPlanResp struct { + mq.MessageBodyBase +} + +func NewCancelIOPlan(planID ioswitch.PlanID) *CancelIOPlan { + return &CancelIOPlan{ + PlanID: planID, + } +} +func NewCancelIOPlanResp() *CancelIOPlanResp { + return &CancelIOPlanResp{} +} +func (client *Client) CancelIOPlan(msg *CancelIOPlan, opts ...mq.RequestOption) (*CancelIOPlanResp, error) { + return mq.Request(Service.CancelIOPlan, client.rabbitCli, msg, opts...) +} diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/agent/server.go index 8819c17..40b8706 100644 --- a/common/pkgs/mq/agent/server.go +++ b/common/pkgs/mq/agent/server.go @@ -6,6 +6,8 @@ import ( ) type Service interface { + IOService + ObjectService StorageService