| @@ -273,21 +273,20 @@ func PackageDeletePackage(ctx CommandContext, packageID int64) error { | |||
| return nil | |||
| } | |||
| func PackageGetCacheNodesByPackage(ctx CommandContext, packageID int64, userID int64) error { | |||
| nodeIDs, redunancyType, err := ctx.Cmdline.Svc.PackageSvc().GetCacheNodesByPackage(userID, packageID) | |||
| fmt.Printf("nodeIDs: %v\n", nodeIDs) | |||
| fmt.Printf("redunancyType: %v\n", redunancyType) | |||
| func PackageGetCachedNodes(ctx CommandContext, packageID int64, userID int64) error { | |||
| resp, err := ctx.Cmdline.Svc.PackageSvc().GetCachedNodes(userID, packageID) | |||
| fmt.Printf("resp: %v\n", resp) | |||
| if err != nil { | |||
| return fmt.Errorf("get cache nodes by packageID %d failed, err: %w", packageID, err) | |||
| return fmt.Errorf("get package %d cached nodes failed, err: %w", packageID, err) | |||
| } | |||
| return nil | |||
| } | |||
| func PackageGetStorageNodesByPackage(ctx CommandContext, packageID int64, userID int64) error { | |||
| nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetStorageNodesByPackage(userID, packageID) | |||
| func PackageGetLoadedNodes(ctx CommandContext, packageID int64, userID int64) error { | |||
| nodeIDs, err := ctx.Cmdline.Svc.PackageSvc().GetLoadedNodes(userID, packageID) | |||
| fmt.Printf("nodeIDs: %v\n", nodeIDs) | |||
| if err != nil { | |||
| return fmt.Errorf("get storage nodes by packageID %d failed, err: %w", packageID, err) | |||
| return fmt.Errorf("get package %d loaded nodes failed, err: %w", packageID, err) | |||
| } | |||
| return nil | |||
| } | |||
| @@ -307,7 +306,7 @@ func init() { | |||
| commands.MustAdd(PackageDeletePackage, "pkg", "delete") | |||
| commands.MustAdd(PackageGetCacheNodesByPackage, "pkg", "cache", "nodes") | |||
| commands.MustAdd(PackageGetCachedNodes, "pkg", "cached") | |||
| commands.MustAdd(PackageGetStorageNodesByPackage, "pkg", "storage", "nodes") | |||
| commands.MustAdd(PackageGetLoadedNodes, "pkg", "loaded") | |||
| } | |||
| @@ -172,65 +172,61 @@ func (s *PackageService) Delete(ctx *gin.Context) { | |||
| ctx.JSON(http.StatusOK, OK(nil)) | |||
| } | |||
| type PackageGetCacheNodeIDs struct { | |||
| type GetCachedNodesReq struct { | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| PackageID *int64 `json:"packageID" binding:"required"` | |||
| } | |||
| type GetCacheNodesByPackageResp struct { | |||
| NodeIDs []int64 `json:"nodeIDs"` | |||
| RedunancyType string `json:"redunancyType,string"` | |||
| type GetCachedNodesResp struct { | |||
| models.PackageCachingInfo | |||
| } | |||
| func (s *PackageService) GetCacheNodeIDs(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "Package.GetCacheNodeIDs") | |||
| func (s *PackageService) GetCachedNodes(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "Package.GetCachedNodes") | |||
| var req PackageGetCacheNodeIDs | |||
| var req GetCachedNodesReq | |||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||
| log.Warnf("binding body: %s", err.Error()) | |||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||
| return | |||
| } | |||
| nodeIDs, redunancyType, err := s.svc.PackageSvc().GetCacheNodesByPackage(*req.UserID, *req.PackageID) | |||
| resp, err := s.svc.PackageSvc().GetCachedNodes(*req.UserID, *req.PackageID) | |||
| if err != nil { | |||
| log.Warnf("get cache nodes by packageID failed: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get cache nodes by packageID failed")) | |||
| log.Warnf("get package cached nodes failed: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package cached nodes failed")) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(GetCacheNodesByPackageResp{ | |||
| NodeIDs: nodeIDs, | |||
| RedunancyType: redunancyType, | |||
| })) | |||
| ctx.JSON(http.StatusOK, OK(GetCachedNodesResp{resp})) | |||
| } | |||
| type PackageGetStorageNodeIDs struct { | |||
| type GetLoadedNodesReq struct { | |||
| UserID *int64 `json:"userID" binding:"required"` | |||
| PackageID *int64 `json:"packageID" binding:"required"` | |||
| } | |||
| type GetStorageNodesByPackageResp struct { | |||
| type GetLoadedNodesResp struct { | |||
| NodeIDs []int64 `json:"nodeIDs"` | |||
| } | |||
| func (s *PackageService) GetStorageNodeIDs(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "Package.GetStorageNodeIDs") | |||
| func (s *PackageService) GetLoadedNodes(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "Package.GetLoadedNodes") | |||
| var req PackageGetStorageNodeIDs | |||
| var req GetLoadedNodesReq | |||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||
| log.Warnf("binding body: %s", err.Error()) | |||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||
| return | |||
| } | |||
| nodeIDs, err := s.svc.PackageSvc().GetStorageNodesByPackage(*req.UserID, *req.PackageID) | |||
| nodeIDs, err := s.svc.PackageSvc().GetLoadedNodes(*req.UserID, *req.PackageID) | |||
| if err != nil { | |||
| log.Warnf("get storage nodes by packageID failed: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get storage nodes by packageID failed")) | |||
| log.Warnf("get package loaded nodes failed: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get package loaded nodes failed")) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(GetStorageNodesByPackageResp{ | |||
| ctx.JSON(http.StatusOK, OK(GetLoadedNodesResp{ | |||
| NodeIDs: nodeIDs, | |||
| })) | |||
| } | |||
| @@ -42,8 +42,8 @@ func (s *Server) initRouters() { | |||
| s.engine.POST("/package/upload", s.PackageSvc().Upload) | |||
| s.engine.POST("/package/delete", s.PackageSvc().Delete) | |||
| s.engine.GET("/package/getCacheNodeIDs", s.PackageSvc().GetCacheNodeIDs) | |||
| s.engine.GET("/package/getStorageNodeIDs", s.PackageSvc().GetStorageNodeIDs) | |||
| s.engine.GET("/package/getCachedNodes", s.PackageSvc().GetCachedNodes) | |||
| s.engine.GET("/package/getLoadedNodes", s.PackageSvc().GetLoadedNodes) | |||
| s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage) | |||
| s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage) | |||
| @@ -219,30 +219,36 @@ func (svc *PackageService) DeletePackage(userID int64, packageID int64) error { | |||
| return nil | |||
| } | |||
| func (svc *PackageService) GetCacheNodesByPackage(userID int64, packageID int64) ([]int64, string, error) { | |||
| func (svc *PackageService) GetCachedNodes(userID int64, packageID int64) (models.PackageCachingInfo, error) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return nil, "", fmt.Errorf("new coordinator client: %w", err) | |||
| return models.PackageCachingInfo{}, fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer coorCli.Close() | |||
| resp, err := coorCli.GetCacheNodesByPackage(coormq.NewGetCacheNodesByPackage(userID, packageID)) | |||
| resp, err := coorCli.GetPackageCachedNodes(coormq.NewGetPackageCachedNodes(userID, packageID)) | |||
| if err != nil { | |||
| return nil, "", fmt.Errorf("get node by package: %w", err) | |||
| return models.PackageCachingInfo{}, fmt.Errorf("get package cached nodes: %w", err) | |||
| } | |||
| return resp.NodeIDs, resp.RedundancyType, nil | |||
| tmp := models.PackageCachingInfo{ | |||
| NodeInfos: resp.NodeInfos, | |||
| PackageSize: resp.PackageSize, | |||
| RedunancyType: resp.RedunancyType, | |||
| } | |||
| return tmp, nil | |||
| } | |||
| func (svc *PackageService) GetStorageNodesByPackage(userID int64, packageID int64) ([]int64, error) { | |||
| func (svc *PackageService) GetLoadedNodes(userID int64, packageID int64) ([]int64, error) { | |||
| coorCli, err := globals.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return nil, fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer coorCli.Close() | |||
| resp, err := coorCli.GetStorageNodesByPackage(coormq.NewGetStorageNodesByPackage(userID, packageID)) | |||
| resp, err := coorCli.GetPackageLoadedNodes(coormq.NewGetPackageLoadedNodes(userID, packageID)) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("get node by package: %w", err) | |||
| return nil, fmt.Errorf("get package loaded nodes: %w", err) | |||
| } | |||
| return resp.NodeIDs, nil | |||
| } | |||