Browse Source

Merge pull request '增加缓存移动功能' (#4) from feature_gxh into master

gitlink
baohan 2 years ago
parent
commit
74a185ce46
6 changed files with 210 additions and 1 deletions
  1. +32
    -0
      internal/cmdline/cache.go
  2. +64
    -0
      internal/http/cacah.go
  3. +4
    -0
      internal/http/server.go
  4. +55
    -0
      internal/http/storage.go
  5. +55
    -0
      internal/services/cacah.go
  6. +0
    -1
      internal/services/storage.go

+ 32
- 0
internal/cmdline/cache.go View File

@@ -0,0 +1,32 @@
package cmdline

import (
"fmt"
"time"
)

func CacheMovePackage(ctx CommandContext, packageID int64, nodeID int64) error {
taskID, err := ctx.Cmdline.Svc.CacheSvc().StartCacheMovePackage(0, packageID, nodeID)
if err != nil {
return fmt.Errorf("start cache moving package: %w", err)
}

for {
complete, err := ctx.Cmdline.Svc.CacheSvc().WaitCacheMovePackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
return fmt.Errorf("moving complete with: %w", err)
}

return nil
}

if err != nil {
return fmt.Errorf("wait moving: %w", err)
}
}
}

func init() {
commands.Add(CacheMovePackage, "cache", "move")
}

+ 64
- 0
internal/http/cacah.go View File

@@ -0,0 +1,64 @@
package http

import (
"net/http"
"time"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
)

type CacheService struct {
*Server
}

func (s *Server) CacheSvc() *CacheService {
return &CacheService{
Server: s,
}
}

type CacheMovePackageReq struct {
UserID *int64 `json:"userID" binding:"required"`
PackageID *int64 `json:"packageID" binding:"required"`
NodeID *int64 `json:"nodeID" binding:"required"`
}

func (s *CacheService) MovePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Cache.LoadPackage")

var req CacheMovePackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

taskID, err := s.svc.CacheSvc().StartCacheMovePackage(*req.UserID, *req.PackageID, *req.NodeID)
if err != nil {
log.Warnf("start cache move package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed"))
return
}

for {
complete, err := s.svc.CacheSvc().WaitCacheMovePackage(*req.NodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("moving complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed"))
return
}

ctx.JSON(http.StatusOK, OK(nil))
return
}

if err != nil {
log.Warnf("wait moving: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "cache move package failed"))
return
}
}
}

+ 4
- 0
internal/http/server.go View File

@@ -39,8 +39,12 @@ func (s *Server) Serve() error {

func (s *Server) initRouters() {
s.engine.GET("/object/download", s.ObjectSvc().Download)

s.engine.POST("/package/upload", s.PackageSvc().Upload)
s.engine.POST("/package/delete", s.PackageSvc().Delete)

s.engine.POST("/storage/loadPackage", s.StorageSvc().LoadPackage)
s.engine.POST("/storage/createPackage", s.StorageSvc().CreatePackage)

s.engine.POST("/cache/movePackage", s.CacheSvc().MovePackage)
}

+ 55
- 0
internal/http/storage.go View File

@@ -6,6 +6,7 @@ import (

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/logger"
)

@@ -62,3 +63,57 @@ func (s *StorageService) LoadPackage(ctx *gin.Context) {
}
}
}

type StorageCreatePackageReq struct {
UserID *int64 `json:"userID" binding:"required"`
StorageID *int64 `json:"storageID" binding:"required"`
Path string `json:"path" binding:"required"`
BucketID *int64 `json:"bucketID" binding:"required"`
Name string `json:"name" binding:"required"`
Redundancy models.TypedRedundancyInfo `json:"redundancy" binding:"required"`
}

type StorageCreatePackageResp struct {
PackageID int64 `json:"packageID"`
}

func (s *StorageService) CreatePackage(ctx *gin.Context) {
log := logger.WithField("HTTP", "Storage.CreatePackage")

var req StorageCreatePackageReq
if err := ctx.ShouldBindJSON(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

nodeID, taskID, err := s.svc.StorageSvc().StartStorageCreatePackage(
*req.UserID, *req.BucketID, req.Name, *req.StorageID, req.Path, req.Redundancy)
if err != nil {
log.Warnf("start storage create package: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}

for {
complete, packageID, err := s.svc.StorageSvc().WaitStorageCreatePackage(nodeID, taskID, time.Second*10)
if complete {
if err != nil {
log.Warnf("creating complete with: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}

ctx.JSON(http.StatusOK, OK(StorageCreatePackageResp{
PackageID: packageID,
}))
return
}

if err != nil {
log.Warnf("wait creating: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "storage create package failed"))
return
}
}
}

+ 55
- 0
internal/services/cacah.go View File

@@ -0,0 +1,55 @@
package services

import (
"fmt"
"time"

"gitlink.org.cn/cloudream/storage-common/globals"
agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
)

type CacheService struct {
*Service
}

func (svc *Service) CacheSvc() *CacheService {
return &CacheService{Service: svc}
}

func (svc *CacheService) StartCacheMovePackage(userID int64, packageID int64, nodeID int64) (string, error) {
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
return "", fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

startResp, err := agentCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID))
if err != nil {
return "", fmt.Errorf("start cache move package: %w", err)
}

return startResp.TaskID, nil
}

func (svc *CacheService) WaitCacheMovePackage(nodeID int64, taskID string, waitTimeout time.Duration) (bool, error) {
agentCli, err := globals.AgentMQPool.Acquire(nodeID)
if err != nil {
return true, fmt.Errorf("new agent client: %w", err)
}
defer agentCli.Close()

waitResp, err := agentCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
return true, fmt.Errorf("wait cache move package: %w", err)
}

if !waitResp.IsComplete {
return false, nil
}

if waitResp.Error != "" {
return true, fmt.Errorf("%s", waitResp.Error)
}

return true, nil
}

+ 0
- 1
internal/services/storage.go View File

@@ -30,7 +30,6 @@ func (svc *StorageService) WaitStorageLoadPackage(taskID string, waitTimeout tim
return true, tsk.Error()
}
return false, nil

}

func (svc *StorageService) DeleteStoragePackage(userID int64, packageID int64, storageID int64) error {


Loading…
Cancel
Save