From 9b8fd5ecbdb4b128791285611f623cd380a93d4a Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Sat, 6 May 2023 10:06:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0IPFS=E4=B8=8A=E4=BC=A0=E4=B8=8B=E8=BD=BD=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- command_service.go | 93 ------------------- internal/config/config.go | 3 +- internal/services/cmd/object.go | 21 +++++ internal/services/cmd/service.go | 13 +++ .../services/cmd/storage.go | 80 +++++++++++++++- .../services/grpc/grpc_service.go | 4 +- main.go | 9 +- 7 files changed, 121 insertions(+), 102 deletions(-) delete mode 100644 command_service.go create mode 100644 internal/services/cmd/object.go create mode 100644 internal/services/cmd/service.go rename command_service_ec.go => internal/services/cmd/storage.go (56%) rename grpc_service.go => internal/services/grpc/grpc_service.go (98%) diff --git a/command_service.go b/command_service.go deleted file mode 100644 index d3bce9d..0000000 --- a/command_service.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "io" - "os" - "path/filepath" - - log "github.com/sirupsen/logrus" - "gitlink.org.cn/cloudream/agent/internal/config" - "gitlink.org.cn/cloudream/utils" - - coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" - ramsg "gitlink.org.cn/cloudream/rabbitmq/message" - agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" - "gitlink.org.cn/cloudream/utils/consts/errorcode" - myio "gitlink.org.cn/cloudream/utils/io" - "gitlink.org.cn/cloudream/utils/ipfs" -) - -type CommandService struct { - ipfs *ipfs.IPFS -} - -func NewCommandService(ipfs *ipfs.IPFS) *CommandService { - return &CommandService{ - ipfs: ipfs, - } -} - -func (service *CommandService) RepMove(msg *agtmsg.RepMoveCommand) *agtmsg.AgentMoveResp { - outFileName := utils.MakeMoveOperationFileName(msg.Body.ObjectID, msg.Body.UserID) - outFileDir := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory) - outFilePath := filepath.Join(outFileDir, outFileName) - - err := os.MkdirAll(outFileDir, 0644) - if err != nil { - log.Warnf("create file directory %s failed, err: %s", outFileDir, err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file directory failed") - } - - outFile, err := os.Create(outFilePath) - if err != nil { - log.Warnf("create file %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file failed") - } - defer outFile.Close() - - hashs := msg.Body.Hashs - fileHash := hashs[0] - ipfsRd, err := service.ipfs.OpenRead(fileHash) - if err != nil { - log.Warnf("read ipfs file %s failed, err: %s", fileHash, err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file failed") - } - defer ipfsRd.Close() - - buf := make([]byte, 1024) - for { - readCnt, err := ipfsRd.Read(buf) - - if readCnt > 0 { - err = myio.WriteAll(outFile, buf[:readCnt]) - if err != nil { - log.Warnf("write data to file %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "write data to file failed") - } - } - - // 文件读取完毕 - if err == io.EOF { - break - } - - if err != nil { - log.Warnf("read ipfs file %s data failed, err: %s", fileHash, err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file data failed") - } - } - - //向coor报告临时缓存hash - coorClient, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ) - if err != nil { - log.Warnf("new coordinator client failed, err: %s", err.Error()) - return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "new coordinator client failed") - } - defer coorClient.Close() - - // TODO 这里更新失败残留下的文件是否要删除? - coorClient.TempCacheReport(coormsg.NewTempCacheReportBody(config.Cfg().ID, hashs)) - - return ramsg.ReplyOK(agtmsg.NewAgentMoveRespBody()) -} diff --git a/internal/config/config.go b/internal/config/config.go index 5e54aba..61b6e51 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( racfg "gitlink.org.cn/cloudream/rabbitmq/config" c "gitlink.org.cn/cloudream/utils/config" + "gitlink.org.cn/cloudream/utils/ipfs" log "gitlink.org.cn/cloudream/utils/logger" ) @@ -10,10 +11,10 @@ type Config struct { ID int `json:"id"` GRPCListenAddress string `json:"grpcListenAddress"` LocalIP string `json:"localIP"` - IPFSPort int `json:"ipfsPort"` StorageBaseDir string `json:"storageBaseDir"` Logger log.Config `json:"logger"` RabbitMQ racfg.Config `json:"rabbitMQ"` + IPFS ipfs.Config `json:"ipfs"` } var cfg Config diff --git a/internal/services/cmd/object.go b/internal/services/cmd/object.go new file mode 100644 index 0000000..5d7d4e7 --- /dev/null +++ b/internal/services/cmd/object.go @@ -0,0 +1,21 @@ +package cmd + +import ( + log "github.com/sirupsen/logrus" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" + agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + "gitlink.org.cn/cloudream/utils/consts/errorcode" +) + +func (svc *Service) PinObject(msg *agtmsg.PinObject) *agtmsg.PinObjectResp { + log.WithField("FileHash", msg.Body.FileHash).Debugf("pin object") + + err := svc.ipfs.Pin(msg.Body.FileHash) + if err != nil { + log.WithField("FileHash", msg.Body.FileHash). + Warnf("pin object failed, err: %s", err.Error()) + return ramsg.ReplyFailed[agtmsg.PinObjectResp](errorcode.OPERATION_FAILED, "pin object failed") + } + + return ramsg.ReplyOK(agtmsg.NewPinObjectRespBody()) +} diff --git a/internal/services/cmd/service.go b/internal/services/cmd/service.go new file mode 100644 index 0000000..8e73ba4 --- /dev/null +++ b/internal/services/cmd/service.go @@ -0,0 +1,13 @@ +package cmd + +import "gitlink.org.cn/cloudream/utils/ipfs" + +type Service struct { + ipfs *ipfs.IPFS +} + +func NewService(ipfs *ipfs.IPFS) *Service { + return &Service{ + ipfs: ipfs, + } +} diff --git a/command_service_ec.go b/internal/services/cmd/storage.go similarity index 56% rename from command_service_ec.go rename to internal/services/cmd/storage.go index 9d05856..beab88a 100644 --- a/command_service_ec.go +++ b/internal/services/cmd/storage.go @@ -1,16 +1,90 @@ -package main +package cmd import ( "fmt" + "io" "os" "path/filepath" "sync" + log "github.com/sirupsen/logrus" + "gitlink.org.cn/cloudream/agent/internal/config" "gitlink.org.cn/cloudream/ec" + "gitlink.org.cn/cloudream/utils" + + coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" + "gitlink.org.cn/cloudream/utils/consts/errorcode" + myio "gitlink.org.cn/cloudream/utils/io" ) -func (service *CommandService) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.AgentMoveResp { +func (service *Service) RepMove(msg *agtmsg.RepMoveCommand) *agtmsg.AgentMoveResp { + outFileName := utils.MakeMoveOperationFileName(msg.Body.ObjectID, msg.Body.UserID) + outFileDir := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory) + outFilePath := filepath.Join(outFileDir, outFileName) + + err := os.MkdirAll(outFileDir, 0644) + if err != nil { + log.Warnf("create file directory %s failed, err: %s", outFileDir, err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file directory failed") + } + + outFile, err := os.Create(outFilePath) + if err != nil { + log.Warnf("create file %s failed, err: %s", outFilePath, err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file failed") + } + defer outFile.Close() + + hashs := msg.Body.Hashs + fileHash := hashs[0] + ipfsRd, err := service.ipfs.OpenRead(fileHash) + if err != nil { + log.Warnf("read ipfs file %s failed, err: %s", fileHash, err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file failed") + } + defer ipfsRd.Close() + + buf := make([]byte, 1024) + for { + readCnt, err := ipfsRd.Read(buf) + + if readCnt > 0 { + err = myio.WriteAll(outFile, buf[:readCnt]) + if err != nil { + log.Warnf("write data to file %s failed, err: %s", outFilePath, err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "write data to file failed") + } + } + + // 文件读取完毕 + if err == io.EOF { + break + } + + if err != nil { + log.Warnf("read ipfs file %s data failed, err: %s", fileHash, err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file data failed") + } + } + + //向coor报告临时缓存hash + coorClient, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ) + if err != nil { + log.Warnf("new coordinator client failed, err: %s", err.Error()) + return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "new coordinator client failed") + } + defer coorClient.Close() + + // TODO 这里更新失败残留下的文件是否要删除? + coorClient.TempCacheReport(coormsg.NewTempCacheReportBody(config.Cfg().ID, hashs)) + + return ramsg.ReplyOK(agtmsg.NewAgentMoveRespBody()) +} + +func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.AgentMoveResp { panic("not implement yet!") /* wg := sync.WaitGroup{} @@ -105,7 +179,7 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int } } -func (service *CommandService) get(blockHash string, getBuf chan []byte, numPacket int64) { +func (service *Service) get(blockHash string, getBuf chan []byte, numPacket int64) { /* data := CatIPFS(blockHash) for i := 0; int64(i) < numPacket; i++ { diff --git a/grpc_service.go b/internal/services/grpc/grpc_service.go similarity index 98% rename from grpc_service.go rename to internal/services/grpc/grpc_service.go index f887212..8178a7c 100644 --- a/grpc_service.go +++ b/internal/services/grpc/grpc_service.go @@ -1,4 +1,4 @@ -package main +package grpc import ( "fmt" @@ -15,7 +15,7 @@ type GRPCService struct { ipfs *ipfs.IPFS } -func NewGPRCService(ipfs *ipfs.IPFS) *GRPCService { +func NewService(ipfs *ipfs.IPFS) *GRPCService { return &GRPCService{ ipfs: ipfs, } diff --git a/main.go b/main.go index 4cffcaf..33c5c7d 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,9 @@ import ( "google.golang.org/grpc" rasvr "gitlink.org.cn/cloudream/rabbitmq/server/agent" + + cmdsvc "gitlink.org.cn/cloudream/agent/internal/services/cmd" + grpcsvc "gitlink.org.cn/cloudream/agent/internal/services/grpc" ) // TODO 此数据是否在运行时会发生变化? @@ -36,7 +39,7 @@ func main() { os.Exit(1) } - ipfs, err := ipfs.NewIPFS(config.Cfg().IPFSPort) + ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS) if err != nil { log.Fatalf("new ipfs failed, err: %s", err.Error()) } @@ -47,7 +50,7 @@ func main() { // 启动命令服务器 // TODO 需要设计AgentID持久化机制 - agtSvr, err := rasvr.NewAgentServer(NewCommandService(ipfs), config.Cfg().ID, &config.Cfg().RabbitMQ) + agtSvr, err := rasvr.NewAgentServer(cmdsvc.NewService(ipfs), config.Cfg().ID, &config.Cfg().RabbitMQ) if err != nil { log.Fatalf("new agent server failed, err: %s", err.Error()) } @@ -66,7 +69,7 @@ func main() { } s := grpc.NewServer() - agentserver.RegisterFileTransportServer(s, NewGPRCService(ipfs)) + agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs)) s.Serve(lis) wg.Wait()