diff --git a/internal/services/object.go b/internal/services/object.go index d430253..1be3105 100644 --- a/internal/services/object.go +++ b/internal/services/object.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "math/rand" - "sync" "gitlink.org.cn/cloudream/client/internal/config" "gitlink.org.cn/cloudream/db/model" @@ -34,22 +33,22 @@ func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, err } func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) { - readResp, err := svc.coordinator.Read(coormsg.NewReadCommandBody(objectID, userID, config.Cfg().ExternalIP)) + preDownloadResp, err := svc.coordinator.PreDownloadObject(coormsg.NewReadCommandBody(objectID, userID, config.Cfg().ExternalIP)) if err != nil { return nil, fmt.Errorf("request to coordinator failed, err: %w", err) } - if readResp.ErrorCode != errorcode.OK { - return nil, fmt.Errorf("coordinator operation failed, code: %s, message: %s", readResp.ErrorCode, readResp.ErrorMessage) + if preDownloadResp.ErrorCode != errorcode.OK { + return nil, fmt.Errorf("coordinator operation failed, code: %s, message: %s", preDownloadResp.ErrorCode, preDownloadResp.ErrorMessage) } - switch readResp.Body.Redundancy { + switch preDownloadResp.Body.Redundancy { case consts.REDUNDANCY_REP: - if len(readResp.Body.Entries) == 0 { + if len(preDownloadResp.Body.Entries) == 0 { return nil, fmt.Errorf("no node has this file") } // 选择下载节点 - entry := svc.chooseDownloadNode(readResp.Body.Entries) + entry := svc.chooseDownloadNode(preDownloadResp.Body.Entries) // 如果客户端与节点在同一个地域,则使用内网地址连接节点 nodeIP := entry.NodeExternalIP @@ -71,19 +70,19 @@ func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadClose // ecRead(readResp.FileSizeInBytes, readResp.NodeIPs, readResp.Hashes, readResp.BlockIDs, *readResp.ECName) } - return nil, fmt.Errorf("unsupported redundancy type: %s", readResp.Body.Redundancy) + return nil, fmt.Errorf("unsupported redundancy type: %s", preDownloadResp.Body.Redundancy) } // chooseDownloadNode 选择一个下载节点 // 1. 从与当前客户端相同地域的节点中随机选一个 // 2. 没有用的话从所有节点中随机选一个 -func (svc *ObjectService) chooseDownloadNode(entries []coormsg.ReadRespEntry) coormsg.ReadRespEntry { - sameLocationEntries := lo.Filter(entries, func(e coormsg.ReadRespEntry, i int) bool { return e.IsSameLocation }) +func (svc *ObjectService) chooseDownloadNode(entries []coormsg.PreDownloadObjectRespEntry) coormsg.PreDownloadObjectRespEntry { + sameLocationEntries := lo.Filter(entries, func(e coormsg.PreDownloadObjectRespEntry, i int) bool { return e.IsSameLocation }) if len(sameLocationEntries) > 0 { - return sameLocationEntries[rand.Int()%len(sameLocationEntries)] + return sameLocationEntries[rand.Intn(len(sameLocationEntries))] } - return entries[rand.Int()%len(entries)] + return entries[rand.Intn(len(entries))] } func (svc *ObjectService) downloadAsRepObject(nodeIP string, fileHash string) (io.ReadCloser, error) { @@ -114,7 +113,7 @@ func (svc *ObjectService) downloadAsRepObject(nodeIP string, fileHash string) (i func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repNum int) error { //发送写请求,请求Coor分配写入节点Ip - repWriteResp, err := svc.coordinator.RepWrite(coormsg.NewRepWriteCommandBody(bucketID, objectName, fileSize, repNum, userID, config.Cfg().ExternalIP)) + repWriteResp, err := svc.coordinator.PreUploadRepObject(coormsg.NewPreUploadRepObjectBody(bucketID, objectName, fileSize, userID, config.Cfg().ExternalIP)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } @@ -128,91 +127,72 @@ func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName s 否则,像目前一样,使用grpc向指定节点获取 */ - senders := make([]fileSender, repNum) + uploadNode := svc.chooseUploadNode(repWriteResp.Body.Nodes) + + // 如果客户端与节点在同一个地域,则使用内网地址连接节点 + nodeIP := uploadNode.ExternalIP + if uploadNode.IsSameLocation { + nodeIP = uploadNode.LocalIP + // TODO 以后考虑用log + fmt.Printf("client and node %d are at the same location, use local ip\n", uploadNode.ID) + } // 建立grpc连接,发送请求 - svc.startSendFile(repNum, senders, repWriteResp.Body.Nodes) + grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) + grpcCon, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) + } + defer grpcCon.Close() + + client := agentcaller.NewFileTransportClient(grpcCon) + upload, err := mygrpc.SendFileAsStream(client) + if err != nil { + return fmt.Errorf("request to send file failed, err: %w", err) + } - // 向每个节点发送数据 - err = svc.sendFileData(file, repNum, senders) + // 发送文件数据 + err = svc.sendFileData(file, upload) if err != nil { + // 发生错误则关闭连接 + upload.Abort(io.ErrClosedPipe) return err } // 发送EOF消息,并获得FileHash - svc.sendFinish(repNum, senders) - - // 收集发送成功的节点以及返回的hash - var sucNodeIDs []int - var sucFileHashes []string - for i := 0; i < repNum; i++ { - sender := &senders[i] - - if sender.err == nil { - sucNodeIDs = append(sucNodeIDs, sender.nodeID) - sucFileHashes = append(sucFileHashes, sender.fileHash) - } + fileHash, err := upload.Finish() + if err != nil { + upload.Abort(io.ErrClosedPipe) + return fmt.Errorf("send EOF failed, err: %w", err) } // 记录写入的文件的Hash - // TODO 如果一个都没有写成功,那么是否要发送这个请求? - writeRepHashResp, err := svc.coordinator.WriteRepHash(coormsg.NewWriteRepHashCommandBody(bucketID, objectName, fileSize, repNum, userID, sucNodeIDs, sucFileHashes)) + createObjectResp, err := svc.coordinator.CreateRepObject(coormsg.NewCreateRepObjectBody(bucketID, objectName, fileSize, repNum, userID, uploadNode.ID, fileHash)) if err != nil { return fmt.Errorf("request to coordinator failed, err: %w", err) } - if writeRepHashResp.ErrorCode != errorcode.OK { - return fmt.Errorf("coordinator WriteRepHash failed, code: %s, message: %s", writeRepHashResp.ErrorCode, writeRepHashResp.ErrorMessage) + if createObjectResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator WriteRepHash failed, code: %s, message: %s", createObjectResp.ErrorCode, createObjectResp.ErrorMessage) } return nil } -type fileSender struct { - grpcCon *grpc.ClientConn - stream mygrpc.FileWriteCloser[string] - nodeID int - fileHash string - err error -} - -func (svc *ObjectService) startSendFile(numRep int, senders []fileSender, nodes []coormsg.WriteRespNode) { - for i := 0; i < numRep; i++ { - sender := &senders[i] - - sender.nodeID = nodes[i].ID - - // 如果客户端与节点在同一个地域,则使用内网地址连接节点 - nodeIP := nodes[i].ExternalIP - if nodes[i].IsSameLocation { - nodeIP = nodes[i].LocalIP - // TODO 以后考虑用log - fmt.Printf("client and node %d are at the same location, use local ip\n", nodes[i].ID) - } - - grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) - conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) - - if err != nil { - sender.err = fmt.Errorf("connect to grpc server at %s failed, err: %w", grpcAddr, err) - continue - } - - client := agentcaller.NewFileTransportClient(conn) - stream, err := mygrpc.SendFileAsStream(client) - if err != nil { - conn.Close() - sender.err = fmt.Errorf("request to send file failed, err: %w", err) - continue - } - - sender.grpcCon = conn - sender.stream = stream +// chooseUploadNode 选择一个上传文件的节点 +// 1. 从与当前客户端相同地域的节点中随机选一个 +// 2. 没有用的话从所有节点中随机选一个 +func (svc *ObjectService) chooseUploadNode(nodes []coormsg.PreUploadRespNode) coormsg.PreUploadRespNode { + sameLocationNodes := lo.Filter(nodes, func(e coormsg.PreUploadRespNode, i int) bool { return e.IsSameLocation }) + if len(sameLocationNodes) > 0 { + return sameLocationNodes[rand.Intn(len(sameLocationNodes))] } + + return nodes[rand.Intn(len(nodes))] } -func (svc *ObjectService) sendFileData(file io.ReadCloser, numRep int, senders []fileSender) error { +func (svc *ObjectService) sendFileData(file io.ReadCloser, upload mygrpc.FileWriteCloser[string]) error { - // 共用的发送数据缓冲区 + // 发送数据缓冲区 buf := make([]byte, 2048) for { @@ -220,40 +200,10 @@ func (svc *ObjectService) sendFileData(file io.ReadCloser, numRep int, senders [ readCnt, err := file.Read(buf) if readCnt > 0 { - // 并行的向每个节点发送数据 - hasSender := false - var sendWg sync.WaitGroup - for i := 0; i < numRep; i++ { - sender := &senders[i] - - // 发生了错误的跳过 - if sender.err != nil { - continue - } - - hasSender = true - - sendWg.Add(1) - go func() { - err := myio.WriteAll(sender.stream, buf[:readCnt]) - - // 发生错误则关闭连接 - if err != nil { - sender.stream.Abort(io.ErrClosedPipe) - sender.grpcCon.Close() - sender.err = fmt.Errorf("send file data failed, err: %w", err) - } - - sendWg.Done() - }() - } - - // 等待向每个节点发送数据结束 - sendWg.Wait() + err := myio.WriteAll(upload, buf[:readCnt]) - // 如果所有节点都发送失败,则不要再继续读取文件数据了 - if !hasSender { - break + if err != nil { + return fmt.Errorf("send file data failed, err: %w", err) } } @@ -263,48 +213,12 @@ func (svc *ObjectService) sendFileData(file io.ReadCloser, numRep int, senders [ } if err != nil { - // 读取失败则断开所有连接 - for i := 0; i < numRep; i++ { - sender := &senders[i] - - if sender.err != nil { - continue - } - - sender.stream.Abort(io.ErrClosedPipe) - sender.grpcCon.Close() - sender.err = fmt.Errorf("read file data failed, err: %w", err) - } - return fmt.Errorf("read file data failed, err: %w", err) } } return nil } -func (svc *ObjectService) sendFinish(numRep int, senders []fileSender) { - for i := 0; i < numRep; i++ { - sender := &senders[i] - - // 发生了错误的跳过 - if sender.err != nil { - continue - } - - fileHash, err := sender.stream.Finish() - if err != nil { - sender.err = err - sender.stream.Abort(io.ErrClosedPipe) - sender.grpcCon.Close() - continue - } - - sender.fileHash = fileHash - sender.err = err - sender.grpcCon.Close() - } -} - func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error { // TODO panic("not implement yet")