From 6e17a359727d992b2551058de3be030ee0e945a6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 25 Apr 2023 14:36:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86=E8=B0=83=E7=94=A8=E8=B7=AF?= =?UTF-8?q?=E5=BE=84=E4=B8=8A=E7=9A=84=E4=BB=A3=E7=A0=81=EF=BC=9B=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=88=9B=E5=BB=BABucket=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- commandline.go | 23 ++++++++++++---- services/bucket.go | 35 +++++++++++++++++++----- services/object.go | 66 +++++++++++++-------------------------------- services/service.go | 2 +- services/storage.go | 62 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 61 deletions(-) create mode 100644 services/storage.go diff --git a/commandline.go b/commandline.go index 7138e60..f3fec06 100644 --- a/commandline.go +++ b/commandline.go @@ -9,6 +9,7 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "gitlink.org.cn/cloudream/client/config" + "gitlink.org.cn/cloudream/client/services" myio "gitlink.org.cn/cloudream/utils/io" ) @@ -118,7 +119,7 @@ func Read(localFilePath string, objectID int) error { 否则,像目前一样,使用grpc向指定节点获取 */ // 下载文件 - reader, err := svc.DownloadObject(0, objectID) + reader, err := services.ObjectSvc(svc).DownloadObject(0, objectID) if err != nil { return fmt.Errorf("download object failed, err: %w", err) } @@ -156,7 +157,7 @@ func RepWrite(localFilePath string, bucketID int, objectName string, repNum int) } fileSize := fileInfo.Size() - err = svc.UploadRepObject(0, bucketID, objectName, file, fileSize, repNum) + err = services.ObjectSvc(svc).UploadRepObject(0, bucketID, objectName, file, fileSize, repNum) if err != nil { return fmt.Errorf("upload file data failed, err: %w", err) } @@ -165,7 +166,7 @@ func RepWrite(localFilePath string, bucketID int, objectName string, repNum int) } func Move(objectID int, storageID int) error { - return svc.MoveObjectToStorage(0, objectID, storageID) + return services.StorageSvc(svc).MoveObjectToStorage(0, objectID, storageID) } func EcWrite(localFilePath string, bucketID int, objectName string, ecName string) error { @@ -176,7 +177,7 @@ func EcWrite(localFilePath string, bucketID int, objectName string, ecName strin func GetUserBuckets() error { userID := 0 - buckets, err := svc.GetUserBuckets(userID) + buckets, err := services.BucketSvc(svc).GetUserBuckets(userID) if err != nil { return err } @@ -197,7 +198,7 @@ func GetUserBuckets() error { func GetBucketObjects(bucketID int) error { userID := 0 - objects, err := svc.GetBucketObjects(userID, bucketID) + objects, err := services.BucketSvc(svc).GetBucketObjects(userID, bucketID) if err != nil { return err } @@ -214,3 +215,15 @@ func GetBucketObjects(bucketID int) error { fmt.Print(tb.Render()) return nil } + +func CreateBucket(bucketName string) error { + userID := 0 + + bucketID, err := services.BucketSvc(svc).CreateBucket(userID, bucketName) + if err != nil { + return err + } + + fmt.Printf("Create bucket %s success, id: %d", bucketName, bucketID) + return nil +} diff --git a/services/bucket.go b/services/bucket.go index e7b1bb7..3b4c088 100644 --- a/services/bucket.go +++ b/services/bucket.go @@ -6,35 +6,56 @@ import ( "gitlink.org.cn/cloudream/db/model" ) -func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { +type BucketService struct { + *Service +} + +func BucketSvc(svc *Service) *BucketService { + return &BucketService{Service: svc} +} + +func (svc *BucketService) GetBucket(userID int, bucketID int) (model.Bucket, error) { // TODO panic("not implement yet") } -func (svc *Service) GetUserBuckets(userID int) ([]model.Bucket, error) { +func (svc *BucketService) GetUserBuckets(userID int) ([]model.Bucket, error) { resp, err := svc.coordinator.GetUserBuckets(userID) if err != nil { return nil, fmt.Errorf("get user buckets failed, err: %w", err) } + if !resp.IsOK() { + return nil, fmt.Errorf("create bucket objects failed, code: %s, message: %s", resp.ErrorCode, resp.Message) + } return resp.Buckets, nil } -func (svc *Service) GetBucketObjects(userID int, bucketID int) ([]model.Object, error) { +func (svc *BucketService) GetBucketObjects(userID int, bucketID int) ([]model.Object, error) { resp, err := svc.coordinator.GetBucketObjects(userID, bucketID) if err != nil { return nil, fmt.Errorf("get bucket objects failed, err: %w", err) } + if !resp.IsOK() { + return nil, fmt.Errorf("create bucket objects failed, code: %s, message: %s", resp.ErrorCode, resp.Message) + } return resp.Objects, nil } -func (svc *Service) CreateBucket(userID int, bucketName string) (model.Bucket, error) { - // TODO - panic("not implement yet") +func (svc *BucketService) CreateBucket(userID int, bucketName string) (int, error) { + resp, err := svc.coordinator.CreateBucket(userID, bucketName) + if err != nil { + return 0, fmt.Errorf("create bucket objects failed, err: %w", err) + } + if !resp.IsOK() { + return 0, fmt.Errorf("create bucket objects failed, code: %s, message: %s", resp.ErrorCode, resp.Message) + } + + return resp.BucketID, nil } -func (src *Service) DeleteBucket(userID int, bucketID int) error { +func (src *BucketService) DeleteBucket(userID int, bucketID int) error { // TODO panic("not implement yet") } diff --git a/services/object.go b/services/object.go index c9d20cc..28b3c7e 100644 --- a/services/object.go +++ b/services/object.go @@ -8,7 +8,6 @@ import ( "gitlink.org.cn/cloudream/client/config" "gitlink.org.cn/cloudream/db/model" agentcaller "gitlink.org.cn/cloudream/proto" - racli "gitlink.org.cn/cloudream/rabbitmq/client" "gitlink.org.cn/cloudream/utils/consts" "gitlink.org.cn/cloudream/utils/consts/errorcode" mygrpc "gitlink.org.cn/cloudream/utils/grpc" @@ -17,14 +16,20 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -type BucketService = Service +type ObjectService struct { + *Service +} + +func ObjectSvc(svc *Service) *ObjectService { + return &ObjectService{Service: svc} +} -func (svc *BucketService) GetObject(userID int, objectID int) (model.Object, error) { +func (svc *ObjectService) GetObject(userID int, objectID int) (model.Object, error) { // TODO panic("not implement yet") } -func (svc *BucketService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) { +func (svc *ObjectService) DownloadObject(userID int, objectID int) (io.ReadCloser, error) { readResp, err := svc.coordinator.Read(objectID, userID) if err != nil { return nil, fmt.Errorf("request to coordinator failed, err: %w", err) @@ -55,7 +60,7 @@ func (svc *BucketService) DownloadObject(userID int, objectID int) (io.ReadClose return nil, fmt.Errorf("unsupported redundancy type: %s", readResp.Redundancy) } -func (svc *BucketService) downloadAsRepObject(nodeIP string, fileHash string) (io.ReadCloser, error) { +func (svc *ObjectService) downloadAsRepObject(nodeIP string, fileHash string) (io.ReadCloser, error) { // 连接grpc grpcAddr := fmt.Sprintf("%s:%d", nodeIP, config.Cfg().GRPCPort) conn, err := grpc.Dial(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -78,7 +83,7 @@ func (svc *BucketService) downloadAsRepObject(nodeIP string, fileHash string) (i return reader, nil } -func (svc *BucketService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repNum int) error { +func (svc *ObjectService) UploadRepObject(userID int, bucketID int, objectName string, file io.ReadCloser, fileSize int64, repNum int) error { //发送写请求,请求Coor分配写入节点Ip repWriteResp, err := svc.coordinator.RepWrite(bucketID, objectName, fileSize, repNum, userID) @@ -142,7 +147,7 @@ type fileSender struct { err error } -func (svc *BucketService) startSendFile(numRep int, senders []fileSender, nodeIDs []int, nodeIPs []string) { +func (svc *ObjectService) startSendFile(numRep int, senders []fileSender, nodeIDs []int, nodeIPs []string) { for i := 0; i < numRep; i++ { sender := &senders[i] @@ -169,7 +174,7 @@ func (svc *BucketService) startSendFile(numRep int, senders []fileSender, nodeID } } -func (svc *BucketService) sendFileData(file io.ReadCloser, numRep int, senders []fileSender) error { +func (svc *ObjectService) sendFileData(file io.ReadCloser, numRep int, senders []fileSender) error { // 共用的发送数据缓冲区 buf := make([]byte, 2048) @@ -239,7 +244,7 @@ func (svc *BucketService) sendFileData(file io.ReadCloser, numRep int, senders [ return nil } -func (svc *BucketService) sendFinish(numRep int, senders []fileSender) { +func (svc *ObjectService) sendFinish(numRep int, senders []fileSender) { for i := 0; i < numRep; i++ { sender := &senders[i] @@ -262,47 +267,12 @@ func (svc *BucketService) sendFinish(numRep int, senders []fileSender) { } } -func (svc *BucketService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error { +func (svc *ObjectService) UploadECObject(userID int, file io.ReadCloser, fileSize int64, ecName string) error { // TODO panic("not implement yet") } -func (svc *BucketService) MoveObjectToStorage(userID int, objectID int, storageID int) error { - // 先向协调端请求文件相关的元数据 - moveResp, err := svc.coordinator.Move(objectID, storageID, userID) - if err != nil { - return fmt.Errorf("request to coordinator failed, err: %w", err) - } - if moveResp.ErrorCode != errorcode.OK { - return fmt.Errorf("coordinator operation failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.Message) - } - - // 然后向代理端发送移动文件的请求 - agentClient, err := racli.NewAgentClient(moveResp.NodeID) - if err != nil { - return fmt.Errorf("create agent client to %d failed, err: %w", storageID, err) - } - defer agentClient.Close() - - switch moveResp.Redundancy { - case consts.REDUNDANCY_REP: - agentMoveResp, err := agentClient.RepMove(moveResp.Directory, moveResp.Hashes, objectID, userID, moveResp.FileSizeInBytes) - if err != nil { - return fmt.Errorf("request to agent %d failed, err: %w", storageID, err) - } - if agentMoveResp.ErrorCode != errorcode.OK { - return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", storageID, agentMoveResp.ErrorCode, agentMoveResp.Message) - } - - case consts.REDUNDANCY_EC: - agentMoveResp, err := agentClient.ECMove(moveResp.Directory, moveResp.Hashes, moveResp.IDs, *moveResp.ECName, objectID, userID, moveResp.FileSizeInBytes) - if err != nil { - return fmt.Errorf("request to agent %d failed, err: %w", storageID, err) - } - if agentMoveResp.ErrorCode != errorcode.OK { - return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", storageID, agentMoveResp.ErrorCode, agentMoveResp.Message) - } - } - - return nil +func (svc *ObjectService) Delete(userID int, objectID int) error { + // TODO + panic("not implement yet") } diff --git a/services/service.go b/services/service.go index 445fb58..ec8b0f2 100644 --- a/services/service.go +++ b/services/service.go @@ -1,7 +1,7 @@ package services import ( - racli "gitlink.org.cn/cloudream/rabbitmq/client" + racli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" ) type Service struct { diff --git a/services/storage.go b/services/storage.go new file mode 100644 index 0000000..5af297e --- /dev/null +++ b/services/storage.go @@ -0,0 +1,62 @@ +package services + +import ( + "fmt" + + racli "gitlink.org.cn/cloudream/rabbitmq/client" + "gitlink.org.cn/cloudream/utils/consts" + "gitlink.org.cn/cloudream/utils/consts/errorcode" +) + +type StorageService struct { + *Service +} + +func StorageSvc(svc *Service) *StorageService { + return &StorageService{Service: svc} +} + +func (svc *StorageService) MoveObjectToStorage(userID int, objectID int, storageID int) error { + // 先向协调端请求文件相关的元数据 + moveResp, err := svc.coordinator.Move(objectID, storageID, userID) + if err != nil { + return fmt.Errorf("request to coordinator failed, err: %w", err) + } + if moveResp.ErrorCode != errorcode.OK { + return fmt.Errorf("coordinator operation failed, code: %s, message: %s", moveResp.ErrorCode, moveResp.Message) + } + + // 然后向代理端发送移动文件的请求 + agentClient, err := racli.NewAgentClient(moveResp.NodeID) + if err != nil { + return fmt.Errorf("create agent client to %d failed, err: %w", storageID, err) + } + defer agentClient.Close() + + switch moveResp.Redundancy { + case consts.REDUNDANCY_REP: + agentMoveResp, err := agentClient.RepMove(moveResp.Directory, moveResp.Hashes, objectID, userID, moveResp.FileSizeInBytes) + if err != nil { + return fmt.Errorf("request to agent %d failed, err: %w", storageID, err) + } + if agentMoveResp.ErrorCode != errorcode.OK { + return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", storageID, agentMoveResp.ErrorCode, agentMoveResp.Message) + } + + case consts.REDUNDANCY_EC: + agentMoveResp, err := agentClient.ECMove(moveResp.Directory, moveResp.Hashes, moveResp.IDs, *moveResp.ECName, objectID, userID, moveResp.FileSizeInBytes) + if err != nil { + return fmt.Errorf("request to agent %d failed, err: %w", storageID, err) + } + if agentMoveResp.ErrorCode != errorcode.OK { + return fmt.Errorf("agent %d operation failed, code: %s, messsage: %s", storageID, agentMoveResp.ErrorCode, agentMoveResp.Message) + } + } + + return nil +} + +func (svc *StorageService) DeleteStorageObject(userID int, objectID int, storageID int) error { + // TODO + panic("not implement yet") +}