Browse Source

支持通过本地IPFS上传下载文件

gitlink
Sydonian 2 years ago
parent
commit
9b8fd5ecbd
7 changed files with 121 additions and 102 deletions
  1. +0
    -93
      command_service.go
  2. +2
    -1
      internal/config/config.go
  3. +21
    -0
      internal/services/cmd/object.go
  4. +13
    -0
      internal/services/cmd/service.go
  5. +77
    -3
      internal/services/cmd/storage.go
  6. +2
    -2
      internal/services/grpc/grpc_service.go
  7. +6
    -3
      main.go

+ 0
- 93
command_service.go View File

@@ -1,93 +0,0 @@
package main

import (
"io"
"os"
"path/filepath"

log "github.com/sirupsen/logrus"
"gitlink.org.cn/cloudream/agent/internal/config"
"gitlink.org.cn/cloudream/utils"

coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/utils/consts/errorcode"
myio "gitlink.org.cn/cloudream/utils/io"
"gitlink.org.cn/cloudream/utils/ipfs"
)

type CommandService struct {
ipfs *ipfs.IPFS
}

func NewCommandService(ipfs *ipfs.IPFS) *CommandService {
return &CommandService{
ipfs: ipfs,
}
}

func (service *CommandService) RepMove(msg *agtmsg.RepMoveCommand) *agtmsg.AgentMoveResp {
outFileName := utils.MakeMoveOperationFileName(msg.Body.ObjectID, msg.Body.UserID)
outFileDir := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory)
outFilePath := filepath.Join(outFileDir, outFileName)

err := os.MkdirAll(outFileDir, 0644)
if err != nil {
log.Warnf("create file directory %s failed, err: %s", outFileDir, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file directory failed")
}

outFile, err := os.Create(outFilePath)
if err != nil {
log.Warnf("create file %s failed, err: %s", outFilePath, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file failed")
}
defer outFile.Close()

hashs := msg.Body.Hashs
fileHash := hashs[0]
ipfsRd, err := service.ipfs.OpenRead(fileHash)
if err != nil {
log.Warnf("read ipfs file %s failed, err: %s", fileHash, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file failed")
}
defer ipfsRd.Close()

buf := make([]byte, 1024)
for {
readCnt, err := ipfsRd.Read(buf)

if readCnt > 0 {
err = myio.WriteAll(outFile, buf[:readCnt])
if err != nil {
log.Warnf("write data to file %s failed, err: %s", outFilePath, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "write data to file failed")
}
}

// 文件读取完毕
if err == io.EOF {
break
}

if err != nil {
log.Warnf("read ipfs file %s data failed, err: %s", fileHash, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file data failed")
}
}

//向coor报告临时缓存hash
coorClient, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Warnf("new coordinator client failed, err: %s", err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "new coordinator client failed")
}
defer coorClient.Close()

// TODO 这里更新失败残留下的文件是否要删除?
coorClient.TempCacheReport(coormsg.NewTempCacheReportBody(config.Cfg().ID, hashs))

return ramsg.ReplyOK(agtmsg.NewAgentMoveRespBody())
}

+ 2
- 1
internal/config/config.go View File

@@ -3,6 +3,7 @@ package config
import (
racfg "gitlink.org.cn/cloudream/rabbitmq/config"
c "gitlink.org.cn/cloudream/utils/config"
"gitlink.org.cn/cloudream/utils/ipfs"
log "gitlink.org.cn/cloudream/utils/logger"
)

@@ -10,10 +11,10 @@ type Config struct {
ID int `json:"id"`
GRPCListenAddress string `json:"grpcListenAddress"`
LocalIP string `json:"localIP"`
IPFSPort int `json:"ipfsPort"`
StorageBaseDir string `json:"storageBaseDir"`
Logger log.Config `json:"logger"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
}

var cfg Config


+ 21
- 0
internal/services/cmd/object.go View File

@@ -0,0 +1,21 @@
package cmd

import (
log "github.com/sirupsen/logrus"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
"gitlink.org.cn/cloudream/utils/consts/errorcode"
)

func (svc *Service) PinObject(msg *agtmsg.PinObject) *agtmsg.PinObjectResp {
log.WithField("FileHash", msg.Body.FileHash).Debugf("pin object")

err := svc.ipfs.Pin(msg.Body.FileHash)
if err != nil {
log.WithField("FileHash", msg.Body.FileHash).
Warnf("pin object failed, err: %s", err.Error())
return ramsg.ReplyFailed[agtmsg.PinObjectResp](errorcode.OPERATION_FAILED, "pin object failed")
}

return ramsg.ReplyOK(agtmsg.NewPinObjectRespBody())
}

+ 13
- 0
internal/services/cmd/service.go View File

@@ -0,0 +1,13 @@
package cmd

import "gitlink.org.cn/cloudream/utils/ipfs"

type Service struct {
ipfs *ipfs.IPFS
}

func NewService(ipfs *ipfs.IPFS) *Service {
return &Service{
ipfs: ipfs,
}
}

command_service_ec.go → internal/services/cmd/storage.go View File

@@ -1,16 +1,90 @@
package main
package cmd

import (
"fmt"
"io"
"os"
"path/filepath"
"sync"

log "github.com/sirupsen/logrus"
"gitlink.org.cn/cloudream/agent/internal/config"
"gitlink.org.cn/cloudream/ec"
"gitlink.org.cn/cloudream/utils"

coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/utils/consts/errorcode"
myio "gitlink.org.cn/cloudream/utils/io"
)

func (service *CommandService) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.AgentMoveResp {
func (service *Service) RepMove(msg *agtmsg.RepMoveCommand) *agtmsg.AgentMoveResp {
outFileName := utils.MakeMoveOperationFileName(msg.Body.ObjectID, msg.Body.UserID)
outFileDir := filepath.Join(config.Cfg().StorageBaseDir, msg.Body.Directory)
outFilePath := filepath.Join(outFileDir, outFileName)

err := os.MkdirAll(outFileDir, 0644)
if err != nil {
log.Warnf("create file directory %s failed, err: %s", outFileDir, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file directory failed")
}

outFile, err := os.Create(outFilePath)
if err != nil {
log.Warnf("create file %s failed, err: %s", outFilePath, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "create local file failed")
}
defer outFile.Close()

hashs := msg.Body.Hashs
fileHash := hashs[0]
ipfsRd, err := service.ipfs.OpenRead(fileHash)
if err != nil {
log.Warnf("read ipfs file %s failed, err: %s", fileHash, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file failed")
}
defer ipfsRd.Close()

buf := make([]byte, 1024)
for {
readCnt, err := ipfsRd.Read(buf)

if readCnt > 0 {
err = myio.WriteAll(outFile, buf[:readCnt])
if err != nil {
log.Warnf("write data to file %s failed, err: %s", outFilePath, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "write data to file failed")
}
}

// 文件读取完毕
if err == io.EOF {
break
}

if err != nil {
log.Warnf("read ipfs file %s data failed, err: %s", fileHash, err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "read ipfs file data failed")
}
}

//向coor报告临时缓存hash
coorClient, err := coorcli.NewCoordinatorClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Warnf("new coordinator client failed, err: %s", err.Error())
return ramsg.ReplyFailed[agtmsg.AgentMoveResp](errorcode.OPERATION_FAILED, "new coordinator client failed")
}
defer coorClient.Close()

// TODO 这里更新失败残留下的文件是否要删除?
coorClient.TempCacheReport(coormsg.NewTempCacheReportBody(config.Cfg().ID, hashs))

return ramsg.ReplyOK(agtmsg.NewAgentMoveRespBody())
}

func (service *Service) ECMove(msg *agtmsg.ECMoveCommand) *agtmsg.AgentMoveResp {
panic("not implement yet!")
/*
wg := sync.WaitGroup{}
@@ -105,7 +179,7 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int
}
}

func (service *CommandService) get(blockHash string, getBuf chan []byte, numPacket int64) {
func (service *Service) get(blockHash string, getBuf chan []byte, numPacket int64) {
/*
data := CatIPFS(blockHash)
for i := 0; int64(i) < numPacket; i++ {

grpc_service.go → internal/services/grpc/grpc_service.go View File

@@ -1,4 +1,4 @@
package main
package grpc

import (
"fmt"
@@ -15,7 +15,7 @@ type GRPCService struct {
ipfs *ipfs.IPFS
}

func NewGPRCService(ipfs *ipfs.IPFS) *GRPCService {
func NewService(ipfs *ipfs.IPFS) *GRPCService {
return &GRPCService{
ipfs: ipfs,
}

+ 6
- 3
main.go View File

@@ -15,6 +15,9 @@ import (
"google.golang.org/grpc"

rasvr "gitlink.org.cn/cloudream/rabbitmq/server/agent"

cmdsvc "gitlink.org.cn/cloudream/agent/internal/services/cmd"
grpcsvc "gitlink.org.cn/cloudream/agent/internal/services/grpc"
)

// TODO 此数据是否在运行时会发生变化?
@@ -36,7 +39,7 @@ func main() {
os.Exit(1)
}

ipfs, err := ipfs.NewIPFS(config.Cfg().IPFSPort)
ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
}
@@ -47,7 +50,7 @@ func main() {

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := rasvr.NewAgentServer(NewCommandService(ipfs), config.Cfg().ID, &config.Cfg().RabbitMQ)
agtSvr, err := rasvr.NewAgentServer(cmdsvc.NewService(ipfs), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error())
}
@@ -66,7 +69,7 @@ func main() {
}

s := grpc.NewServer()
agentserver.RegisterFileTransportServer(s, NewGPRCService(ipfs))
agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs))
s.Serve(lis)

wg.Wait()


Loading…
Cancel
Save