diff --git a/internal/cmdline/commandline.go b/internal/cmdline/commandline.go index c73e65d..d54ac91 100644 --- a/internal/cmdline/commandline.go +++ b/internal/cmdline/commandline.go @@ -39,13 +39,13 @@ func (c *Commandline) DispatchCommand(cmd string, args []string) { fmt.Printf("invalid bucket id %s, err: %s", args[1], err.Error()) os.Exit(1) } - repNum, _ := strconv.Atoi(args[3]) - if repNum <= 0 || repNum > config.Cfg().MaxReplicateNumber { - fmt.Printf("replicate number should not be more than %d", config.Cfg().MaxReplicateNumber) + repCount, _ := strconv.Atoi(args[3]) + if repCount <= 0 || repCount > config.Cfg().MaxRepCount { + fmt.Printf("replicate number should not be more than %d", config.Cfg().MaxRepCount) os.Exit(1) } - if err := c.RepWrite(args[0], bucketID, args[2], repNum); err != nil { + if err := c.RepWrite(args[0], bucketID, args[2], repCount); err != nil { fmt.Printf("rep write failed, err: %s", err.Error()) os.Exit(1) } diff --git a/internal/cmdline/object.go b/internal/cmdline/object.go index af151e0..c06ed57 100644 --- a/internal/cmdline/object.go +++ b/internal/cmdline/object.go @@ -21,10 +21,10 @@ func (c *Commandline) ListBucketObjects(bucketID int) error { fmt.Printf("Find %d objects in bucket %d for user %d:\n", len(objects), bucketID, userID) tb := table.NewWriter() - tb.AppendHeader(table.Row{"ID", "Name", "Size", "BucketID", "State", "Redundancy", "NumRep", "ECName"}) + tb.AppendHeader(table.Row{"ID", "Name", "Size", "BucketID", "State", "Redundancy"}) for _, obj := range objects { - tb.AppendRow(table.Row{obj.ObjectID, obj.Name, obj.BucketID, obj.State, obj.FileSizeInBytes, obj.Redundancy, obj.NumRep, obj.ECName}) + tb.AppendRow(table.Row{obj.ObjectID, obj.Name, obj.BucketID, obj.State, obj.FileSize, obj.Redundancy}) } fmt.Print(tb.Render()) @@ -68,7 +68,7 @@ func (c *Commandline) Read(localFilePath string, objectID int) error { return nil } -func (c *Commandline) RepWrite(localFilePath string, bucketID int, objectName string, repNum int) error { +func (c *Commandline) RepWrite(localFilePath string, bucketID int, objectName string, repCount int) error { file, err := os.Open(localFilePath) if err != nil { return fmt.Errorf("open file %s failed, err: %w", localFilePath, err) @@ -81,7 +81,7 @@ func (c *Commandline) RepWrite(localFilePath string, bucketID int, objectName st } fileSize := fileInfo.Size() - err = services.ObjectSvc(c.svc).UploadRepObject(0, bucketID, objectName, file, fileSize, repNum) + err = services.ObjectSvc(c.svc).UploadRepObject(0, bucketID, objectName, file, fileSize, repCount) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } diff --git a/internal/config/config.go b/internal/config/config.go index 3df843f..ab2add5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,14 +8,14 @@ import ( ) type Config struct { - GRPCPort int `json:"grpcPort"` - GRCPPacketSize int64 `json:"grpcPacketSize"` - MaxReplicateNumber int `json:"maxReplicateNumber"` - LocalIP string `json:"localIP"` - ExternalIP string `json:"externalIP"` - Logger logger.Config `json:"logger"` - RabbitMQ racfg.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon + GRPCPort int `json:"grpcPort"` + GRCPPacketSize int64 `json:"grpcPacketSize"` + MaxRepCount int `json:"maxRepCount"` + LocalIP string `json:"localIP"` + ExternalIP string `json:"externalIP"` + Logger logger.Config `json:"logger"` + RabbitMQ racfg.Config `json:"rabbitMQ"` + IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon } var cfg Config diff --git a/internal/services/client_command_ec.go b/internal/services/client_command_ec.go index 958eb72..fa856b6 100644 --- a/internal/services/client_command_ec.go +++ b/internal/services/client_command_ec.go @@ -26,7 +26,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin if err != nil { return fmt.Errorf("get file %s state failed, err: %w", localFilePath, err) } - fileSizeInBytes := fileInfo.Size() + fileSize := fileInfo.Size() //调用纠删码库,获取编码参数及生成矩阵 ecPolicies := *utils.GetEcPolicy() @@ -45,7 +45,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN //计算每个块的packet数 - numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize) + numPacket := (fileSize + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize) fmt.Println(numPacket) userId := 0 @@ -56,7 +56,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin defer coorClient.Close() //发送写请求,请求Coor分配写入节点Ip - ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSizeInBytes, ecName, userId) + ecWriteResp, err := coorClient.ECWrite(bucketName, objectName, fileSize, ecName, userId) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } @@ -75,7 +75,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin } hashs := make([]string, ecN) //正式开始写入 - go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSizeInBytes) //从本地文件系统加载数据 + go load(localFilePath, loadBufs[:ecN], ecK, numPacket*int64(ecK), fileSize) //从本地文件系统加载数据 go encode(loadBufs[:ecN], encodeBufs[:ecN], ecK, coefs, numPacket) var wg sync.WaitGroup @@ -99,7 +99,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin */ } -func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSizeInBytes int64) { +func load(localFilePath string, loadBufs []chan []byte, ecK int, totalNumPacket int64, fileSize int64) { fmt.Println("load " + localFilePath) file, _ := os.Open(localFilePath) @@ -318,7 +318,7 @@ func persist(inBuf []chan []byte, numPacket int64, localFilePath string, wg *syn wg.Done() } -func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) { +func ecRead(fileSize int64, nodeIPs []string, blockHashs []string, blockIds []int, ecName string, localFilePath string) { //根据ecName获得以下参数 wg := sync.WaitGroup{} ecPolicies := *utils.GetEcPolicy() @@ -328,7 +328,7 @@ func ecRead(fileSizeInBytes int64, nodeIPs []string, blockHashs []string, blockI ecN := ecPolicy.GetN() var coefs = [][]int64{{1, 1, 1}, {1, 2, 3}} //2应替换为ecK,3应替换为ecN - numPacket := (fileSizeInBytes + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize) + numPacket := (fileSize + int64(ecK)*config.Cfg().GRCPPacketSize - 1) / (int64(ecK) * config.Cfg().GRCPPacketSize) fmt.Println(numPacket) //创建channel getBufs := make([]chan []byte, ecN) diff --git a/internal/services/object.go b/internal/services/object.go index 4ac0da4..39befc8 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -9,12 +9,14 @@ import ( "gitlink.org.cn/cloudream/db/model" agentcaller "gitlink.org.cn/cloudream/proto" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" "gitlink.org.cn/cloudream/utils/consts" mygrpc "gitlink.org.cn/cloudream/utils/grpc" myio "gitlink.org.cn/cloudream/utils/io" log "gitlink.org.cn/cloudream/utils/logger" + serder "gitlink.org.cn/cloudream/utils/serder" mysort "gitlink.org.cn/cloudream/utils/sort" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -36,7 +38,7 @@ func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, err } func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) { - preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewReadCommandBody(objectID, userID, config.Cfg().ExternalIP)) + preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewPreDownloadObjectBody(objectID, userID, config.Cfg().ExternalIP)) if err != nil { return nil, fmt.Errorf("request to coordinator failed, err: %w", err) } @@ -46,22 +48,28 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose switch preDownloadResp.Body.Redundancy { case consts.REDUNDANCY_REP: - if len(preDownloadResp.Body.Entries) == 0 { + var repInfo ramsg.RespObjectRepInfo + err := serder.MapToObject(preDownloadResp.Body.RedundancyData.(map[string]any), &repInfo) + if err != nil { + return nil, fmt.Errorf("redundancy data to rep info failed, err: %w", err) + } + + if len(repInfo.Nodes) == 0 { return nil, fmt.Errorf("no node has this file") } // 选择下载节点 - entry := svc.chooseDownloadNode(preDownloadResp.Body.Entries) + entry := svc.chooseDownloadNode(repInfo.Nodes) // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := entry.NodeExternalIP + nodeIP := entry.ExternalIP if entry.IsSameLocation { - nodeIP = entry.NodeLocalIP + nodeIP = entry.LocalIP - log.Infof("client and node %d are at the same location, use local ip\n", entry.NodeID) + log.Infof("client and node %d are at the same location, use local ip\n", entry.ID) } - reader, err := svc.downloadRepObject(nodeIP, entry.FileHash) + reader, err := svc.downloadRepObject(nodeIP, repInfo.FileHash) if err != nil { return nil, fmt.Errorf("rep read failed, err: %w", err) } @@ -70,7 +78,7 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose //case consts.REDUNDANCY_EC: // TODO EC部分的代码要考虑重构 - // ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName) + // ecRead(readResp.FileSize, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName) } return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Body.Redundancy) @@ -79,8 +87,8 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose // chooseDownloadNode 选择一个下载节点 // 1. 从与当前客户端相同地域的节点中随机选一个 // 2. 没有用的话从所有节点中随机选一个 -func (svc *ObjectService) chooseDownloadNode(entries []coormsg.PreDownloadObjectRespEntry) coormsg.PreDownloadObjectRespEntry { - sameLocationEntries := lo.Filter(entries, func(e coormsg.PreDownloadObjectRespEntry, i int) bool { return e.IsSameLocation }) +func (svc *ObjectService) chooseDownloadNode(entries []ramsg.RespNode) ramsg.RespNode { + sameLocationEntries := lo.Filter(entries, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation }) if len(sameLocationEntries) > 0 { return sameLocationEntries[rand.Intn(len(sameLocationEntries))] } @@ -132,7 +140,7 @@ func (svc *ObjectService) downloadFromLocalIPFS(fileHash string) (io.ReadCloser, return reader, nil } -func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repNum int) error { +func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repCount int) error { //发送写请求,请求Coor分配写入节点Ip repWriteResp, err := svc.coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(bucketID, objectName, fileSize, userID, config.Cfg().ExternalIP)) @@ -182,7 +190,7 @@ func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName s } // 记录写入的文件的Hash - createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repNum, userID, uploadedNodeIDs, fileHash)) + createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repCount, userID, uploadedNodeIDs, fileHash)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } @@ -264,8 +272,8 @@ func (svc *ObjectService) uploadToLocalIPFS(file io.ReadCloser, nodeID int) (str // chooseUploadNode 选择一个上传文件的节点 // 1. 从与当前客户端相同地域的节点中随机选一个 // 2. 没有用的话从所有节点中随机选一个 -func (svc *ObjectService) chooseUploadNode(nodes []coormsg.PreUploadRespNode) coormsg.PreUploadRespNode { - sameLocationNodes := lo.Filter(nodes, func(e coormsg.PreUploadRespNode, i int) bool { return e.IsSameLocation }) +func (svc *ObjectService) chooseUploadNode(nodes []ramsg.RespNode) ramsg.RespNode { + sameLocationNodes := lo.Filter(nodes, func(e ramsg.RespNode, i int) bool { return e.IsSameLocation }) if len(sameLocationNodes) > 0 { return sameLocationNodes[rand.Intn(len(sameLocationNodes))] } diff --git a/internal/services/storage.go b/internal/services/storage.go index c15187e..cbda021 100644 --- a/internal/services/storage.go +++ b/internal/services/storage.go @@ -7,7 +7,6 @@ import ( agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" - "gitlink.org.cn/cloudream/utils/consts" ) type StorageService struct { @@ -35,24 +34,19 @@ func (svc *StorageService) MoveObjectToStorage(userID int, objectID int, storage } defer agentClient.Close() - switch preMoveResp.Body.Redundancy { - case consts.REDUNDANCY_REP: - agentMoveResp, err := agentClient.RepMove(agtmsg.NewRepMoveCommandBody(preMoveResp.Body.Directory, preMoveResp.Body.Hashes, objectID, userID, preMoveResp.Body.FileSizeInBytes)) - if err != nil { - return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err) - } - if agentMoveResp.IsFailed() { - return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) - } - - case consts.REDUNDANCY_EC: - agentMoveResp, err := agentClient.ECMove(agtmsg.NewECMoveCommandBody(preMoveResp.Body.Directory, preMoveResp.Body.Hashes, preMoveResp.Body.IDs, *preMoveResp.Body.ECName, objectID, userID, preMoveResp.Body.FileSizeInBytes)) - if err != nil { - return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err) - } - if agentMoveResp.IsFailed() { - return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) - } + agentMoveResp, err := agentClient.MoveObjectToStorage( + agtmsg.NewMoveObjectToStorageBody(preMoveResp.Body.Directory, + objectID, + userID, + preMoveResp.Body.FileSize, + preMoveResp.Body.Redundancy, + preMoveResp.Body.RedundancyData, + )) + if err != nil { + return fmt.Errorf("request to agent %d failed, err: %w", preMoveResp.Body.NodeID, err) + } + if agentMoveResp.IsFailed() { + return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", preMoveResp.Body.NodeID, agentMoveResp.ErrorCode, agentMoveResp.ErrorMessage) } moveResp, err := svc.coordinator.MoveObjectToStorage(coormsg.NewMoveObjectToStorageBody(objectID, storageID, userID))