Browse Source

抽出Message公用的结构体;优化ObjectRep相关的表结构

gitlink
Sydonian 2 years ago
parent
commit
8e0b85bf10
6 changed files with 58 additions and 56 deletions
  1. +4
    -4
      internal/cmdline/commandline.go
  2. +4
    -4
      internal/cmdline/object.go
  3. +8
    -8
      internal/config/config.go
  4. +7
    -7
      internal/services/client_command_ec.go
  5. +22
    -14
      internal/services/object.go
  6. +13
    -19
      internal/services/storage.go

+ 4
- 4
internal/cmdline/commandline.go View File

@@ -39,13 +39,13 @@ func (c *Commandline) DispatchCommand(cmd string, args []string) {
fmt.Printf("invalid bucket id %s, err: %s", args[1], err.Error())
os.Exit(1)
}
repNum, _ := strconv.Atoi(args[3])
if repNum <= 0 || repNum > config.Cfg().MaxReplicateNumber {
fmt.Printf("replicate number should not be more than %d", config.Cfg().MaxReplicateNumber)
repCount, _ := strconv.Atoi(args[3])
if repCount <= 0 || repCount > config.Cfg().MaxRepCount {
fmt.Printf("replicate number should not be more than %d", config.Cfg().MaxRepCount)
os.Exit(1)
}

if err := c.RepWrite(args[0], bucketID, args[2], repNum); err != nil {
if err := c.RepWrite(args[0], bucketID, args[2], repCount); err != nil {
fmt.Printf("rep write failed, err: %s", err.Error())
os.Exit(1)
}


+ 4
- 4
internal/cmdline/object.go View File

@@ -21,10 +21,10 @@ func (c *Commandline) ListBucketObjects(bucketID int) error {
fmt.Printf("Find %d objects in bucket %d for user %d:\n", len(objects), bucketID, userID)

tb := table.NewWriter()
tb.AppendHeader(table.Row{"ID", "Name", "Size", "BucketID", "State", "Redundancy", "NumRep", "ECName"})
tb.AppendHeader(table.Row{"ID", "Name", "Size", "BucketID", "State", "Redundancy"})

for _, obj := range objects {
tb.AppendRow(table.Row{obj.ObjectID, obj.Name, obj.BucketID, obj.State, obj.FileSizeInBytes, obj.Redundancy, obj.NumRep, obj.ECName})
tb.AppendRow(table.Row{obj.ObjectID, obj.Name, obj.BucketID, obj.State, obj.FileSize, obj.Redundancy})
}

fmt.Print(tb.Render())
@@ -68,7 +68,7 @@ func (c *Commandline) Read(localFilePath string, objectID int) error {
return nil
}

func (c *Commandline) RepWrite(localFilePath string, bucketID int, objectName string, repNum int) error {
func (c *Commandline) RepWrite(localFilePath string, bucketID int, objectName string, repCount int) error {
file, err := os.Open(localFilePath)
if err != nil {
return fmt.Errorf("open file %s failed, err: %w", localFilePath, err)
@@ -81,7 +81,7 @@ func (c *Commandline) RepWrite(localFilePath string, bucketID int, objectName st
}
fileSize := fileInfo.Size()

err = services.ObjectSvc(c.svc).UploadRepObject(0, bucketID, objectName, file, fileSize, repNum)
err = services.ObjectSvc(c.svc).UploadRepObject(0, bucketID, objectName, file, fileSize, repCount)
if err != nil {
return fmt.Errorf("upload file data failed, err: %w", err)
}


+ 8
- 8
internal/config/config.go View File

@@ -8,14 +8,14 @@ import (
)

type Config struct {
GRPCPort int `json:"grpcPort"`
GRCPPacketSize int64 `json:"grpcPacketSize"`
MaxReplicateNumber int `json:"maxReplicateNumber"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
Logger logger.Config `json:"logger"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
GRPCPort int `json:"grpcPort"`
GRCPPacketSize int64 `json:"grpcPacketSize"`
MaxRepCount int `json:"maxRepCount"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
Logger logger.Config `json:"logger"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon
}

var cfg Config


+ 7
- 7
internal/services/client_command_ec.go View File

@@ -26,7 +26,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin
if err != nil {
return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err)
}
fileSizeInBytes := fileInfo.Size()
fileSize := fileInfo.Size()

//调用纠删码库,获取编码参数及生成矩阵
ecPolicies := *utils.GetEcPolicy()
@@ -45,7 +45,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin
var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN

//计算每个块的packet数
numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
numPacket := (fileSize + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
fmt.Println(numPacket)

userId := 0
@@ -56,7 +56,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin
defer coorClient.Close()

//发送写请求,请求Coor分配写入节点Ip
ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId)
ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSize, ecName, userId)
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
@@ -75,7 +75,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin
}
hashs := make([]string, ecN)
//正式开始写入
go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据
go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSize) //从本地文件系统加载数据
go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket)

var wg sync.WaitGroup
@@ -99,7 +99,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin
*/
}

func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) {
func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSize int64) {
fmt.Println("load " + localFilePath)
file, _ := os.Open(localFilePath)

@@ -318,7 +318,7 @@ func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *syn
wg.Done()
}

func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) {
func ecRead(fileSize int64, nodeIPs []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) {
//根据ecName获得以下参数
wg := sync.WaitGroup{}
ecPolicies := *utils.GetEcPolicy()
@@ -328,7 +328,7 @@ func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockI
ecN := ecPolicy.GetN()
var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN

numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
numPacket := (fileSize + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize)
fmt.Println(numPacket)
//创建channel
getBufs := make([]chan []byte, ecN)


+ 22
- 14
internal/services/object.go View File

@@ -9,12 +9,14 @@ import (
"gitlink.org.cn/cloudream/db/model"
agentcaller "gitlink.org.cn/cloudream/proto"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
ramsg "gitlink.org.cn/cloudream/rabbitmq/message"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/utils/consts"
mygrpc "gitlink.org.cn/cloudream/utils/grpc"
myio "gitlink.org.cn/cloudream/utils/io"
log "gitlink.org.cn/cloudream/utils/logger"
serder "gitlink.org.cn/cloudream/utils/serder"
mysort "gitlink.org.cn/cloudream/utils/sort"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -36,7 +38,7 @@ func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, err
}

func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) {
preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewReadCommandBody(objectID, userID, config.Cfg().ExternalIP))
preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObjectBody(objectID, userID, config.Cfg().ExternalIP))
if err != nil {
return nil, fmt.Errorf("request to coordinator failed, err: %w", err)
}
@@ -46,22 +48,28 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose

switch preDownloadResp.Body.Redundancy {
case consts.REDUNDANCY_REP:
if len(preDownloadResp.Body.Entries) == 0 {
var repInfo ramsg.RespObjectRepInfo
err := serder.MapToObject(preDownloadResp.Body.RedundancyData.(map[string]any), &repInfo)
if err != nil {
return nil, fmt.Errorf("redundancy data to rep info failed, err: %w", err)
}

if len(repInfo.Nodes) == 0 {
return nil, fmt.Errorf("no node has this file")
}

// 选择下载节点
entry := svc.chooseDownloadNode(preDownloadResp.Body.Entries)
entry := svc.chooseDownloadNode(repInfo.Nodes)

// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := entry.NodeExternalIP
nodeIP := entry.ExternalIP
if entry.IsSameLocation {
nodeIP = entry.NodeLocalIP
nodeIP = entry.LocalIP

log.Infof("client and node %d are at the same location, use local ip\n", entry.NodeID)
log.Infof("client and node %d are at the same location, use local ip\n", entry.ID)
}

reader, err := svc.downloadRepObject(nodeIP, entry.FileHash)
reader, err := svc.downloadRepObject(nodeIP, repInfo.FileHash)
if err != nil {
return nil, fmt.Errorf("rep read failed, err: %w", err)
}
@@ -70,7 +78,7 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose

//case consts.REDUNDANCY_EC:
// TODO EC部分的代码要考虑重构
// ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName)
// ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName)
}

return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Body.Redundancy)
@@ -79,8 +87,8 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose
// chooseDownloadNode 选择一个下载节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (svc *ObjectService) chooseDownloadNode(entries []coormsg.PreDownloadObjectRespEntry) coormsg.PreDownloadObjectRespEntry {
sameLocationEntries := lo.Filter(entries, func(e coormsg.PreDownloadObjectRespEntry, i int) bool { return e.IsSameLocation })
func (svc *ObjectService) chooseDownloadNode(entries []ramsg.RespNode) ramsg.RespNode {
sameLocationEntries := lo.Filter(entries, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationEntries) > 0 {
return sameLocationEntries[rand.Intn(len(sameLocationEntries))]
}
@@ -132,7 +140,7 @@ func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser,
return reader, nil
}

func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repNum int) error {
func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) error {

//发送写请求,请求Coor分配写入节点Ip
repWriteResp, err := svc.coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(bucketID, objectName, fileSize, userID, config.Cfg().ExternalIP))
@@ -182,7 +190,7 @@ func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName s
}

// 记录写入的文件的Hash
createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repNum, userID, uploadedNodeIDs, fileHash))
createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repCount, userID, uploadedNodeIDs, fileHash))
if err != nil {
return fmt.Errorf("request to coordinator failed, err: %w", err)
}
@@ -264,8 +272,8 @@ func (svc *ObjectService) uploadToLocalIPFS(file io.ReadCloser, nodeID int) (str
// chooseUploadNode 选择一个上传文件的节点
// 1. 从与当前客户端相同地域的节点中随机选一个
// 2. 没有用的话从所有节点中随机选一个
func (svc *ObjectService) chooseUploadNode(nodes []coormsg.PreUploadRespNode) coormsg.PreUploadRespNode {
sameLocationNodes := lo.Filter(nodes, func(e coormsg.PreUploadRespNode, i int) bool { return e.IsSameLocation })
func (svc *ObjectService) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode {
sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation })
if len(sameLocationNodes) > 0 {
return sameLocationNodes[rand.Intn(len(sameLocationNodes))]
}


+ 13
- 19
internal/services/storage.go View File

@@ -7,7 +7,6 @@ import (
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator"
"gitlink.org.cn/cloudream/utils/consts"
)

type StorageService struct {
@@ -35,24 +34,19 @@ func (svc *StorageService) MoveObjectToStorage(userID int, objectID int, storage
}
defer agentClient.Close()

switch preMoveResp.Body.Redundancy {
case consts.REDUNDANCY_REP:
agentMoveResp, err := agentClient.RepMove(agtmsg.NewRepMoveCommandBody(preMoveResp.Body.Directory, preMoveResp.Body.Hashes, objectID, userID, preMoveResp.Body.FileSizeInBytes))
if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
if agentMoveResp.IsFailed() {
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage)
}

case consts.REDUNDANCY_EC:
agentMoveResp, err := agentClient.ECMove(agtmsg.NewECMoveCommandBody(preMoveResp.Body.Directory, preMoveResp.Body.Hashes, preMoveResp.Body.IDs, *preMoveResp.Body.ECName, objectID, userID, preMoveResp.Body.FileSizeInBytes))
if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
if agentMoveResp.IsFailed() {
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage)
}
agentMoveResp, err := agentClient.MoveObjectToStorage(
agtmsg.NewMoveObjectToStorageBody(preMoveResp.Body.Directory,
objectID,
userID,
preMoveResp.Body.FileSize,
preMoveResp.Body.Redundancy,
preMoveResp.Body.RedundancyData,
))
if err != nil {
return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err)
}
if agentMoveResp.IsFailed() {
return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage)
}

moveResp, err := svc.coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorageBody(objectID, storageID, userID))


Loading…
Cancel
Save