Browse Source

Merge pull request '1、增加多协议代理相关接口 2、更新字段' (#19) from feature_sjc into master

gitlink
songjc 2 years ago
parent
commit
10ac5c5122
21 changed files with 134 additions and 80 deletions
  1. +5
    -6
      agent/internal/task/storage_load_package.go
  2. +0
    -0
      client/internal/http/cache.go
  3. +45
    -0
      client/internal/http/node.go
  4. +0
    -0
      client/internal/services/cache.go
  5. +32
    -0
      client/internal/services/node.go
  6. +2
    -3
      common/pkgs/cmd/create_package.go
  7. +1
    -2
      common/pkgs/cmd/update_package.go
  8. +4
    -4
      common/pkgs/db/cache.go
  9. +0
    -12
      common/pkgs/db/model/model.go
  10. +6
    -7
      common/pkgs/db/node.go
  11. +3
    -3
      common/pkgs/ioswitch/ops/grpc.go
  12. +3
    -4
      common/pkgs/ioswitch/plans/agent_plan.go
  13. +4
    -5
      common/pkgs/ioswitch/plans/plan_builder.go
  14. +2
    -2
      common/pkgs/ioswitch/plans/plans.go
  15. +6
    -6
      common/pkgs/iterator/download_object_iterator.go
  16. +5
    -6
      common/pkgs/mq/coordinator/node.go
  17. +2
    -2
      coordinator/internal/mq/node.go
  18. +1
    -2
      scanner/internal/event/check_package_redundancy.go
  19. +2
    -3
      scanner/internal/event/clean_pinned.go
  20. +10
    -11
      scanner/internal/event/event_test.go
  21. +1
    -2
      scanner/internal/tickevent/batch_all_agent_check_cache.go

+ 5
- 6
agent/internal/task/storage_load_package.go View File

@@ -19,7 +19,6 @@ import (
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ec"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
@@ -236,7 +235,7 @@ func (t *StorageLoadPackage) downloadECObject(coorCli *coormq.Client, ipfsCli *i
}

type downloadNodeInfo struct {
Node model.Node
Node cdssdk.Node
ObjectPinned bool
Blocks []stgmod.ObjectBlock
Distance float64
@@ -296,7 +295,7 @@ func (t *StorageLoadPackage) sortDownloadNodes(coorCli *coormq.Client, obj stgmo
}

type downloadBlock struct {
Node model.Node
Node cdssdk.Node
Block stgmod.ObjectBlock
}

@@ -324,9 +323,9 @@ func (t *StorageLoadPackage) getMinReadingBlockSolution(sortedNodes []*downloadN
return math.MaxFloat64, gotBlocks
}

func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*downloadNodeInfo, k int) (float64, *model.Node) {
func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*downloadNodeInfo, k int) (float64, *cdssdk.Node) {
dist := math.MaxFloat64
var downloadNode *model.Node
var downloadNode *cdssdk.Node
for _, n := range sortedNodes {
if n.ObjectPinned && float64(k)*n.Distance < dist {
dist = float64(k) * n.Distance
@@ -337,7 +336,7 @@ func (t *StorageLoadPackage) getMinReadingObjectSolution(sortedNodes []*download
return dist, downloadNode
}

func (t *StorageLoadPackage) getNodeDistance(node model.Node) float64 {
func (t *StorageLoadPackage) getNodeDistance(node cdssdk.Node) float64 {
if stgglb.Local.NodeID != nil {
if node.NodeID == *stgglb.Local.NodeID {
return consts.NodeDistanceSameNode


client/internal/http/cacah.go → client/internal/http/cache.go View File


+ 45
- 0
client/internal/http/node.go View File

@@ -0,0 +1,45 @@
package http

import (
"net/http"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type NodeService struct {
*Server
}

func (s *Server) NodeSvc() *NodeService {
return &NodeService{
Server: s,
}
}

type GetNodesReq struct {
NodeIDs *[]cdssdk.NodeID `form:"nodeIDs" binding:"required"`
}
type GetNodesResp = cdssdk.NodeGetNodesResp

func (s *ObjectService) GetNodes(ctx *gin.Context) {
log := logger.WithField("HTTP", "Node.GetNodes")

var req GetNodesReq
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

nodes, err := s.svc.NodeSvc().GetNodes(*req.NodeIDs)
if err != nil {
log.Warnf("getting nodes: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "get nodes failed"))
return
}

ctx.JSON(http.StatusOK, OK(GetNodesResp{Nodes: nodes}))
}

client/internal/services/cacah.go → client/internal/services/cache.go View File


+ 32
- 0
client/internal/services/node.go View File

@@ -0,0 +1,32 @@
package services

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type NodeService struct {
*Service
}

func (svc *Service) NodeSvc() *NodeService {
return &NodeService{Service: svc}
}

func (svc *NodeService) GetNodes(nodeIDs []cdssdk.NodeID) ([]cdssdk.Node, error) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
return nil, fmt.Errorf("new coordinator client: %w", err)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

getResp, err := coorCli.GetNodes(coormq.NewGetNodes(nodeIDs))
if err != nil {
return nil, fmt.Errorf("requsting to coodinator: %w", err)
}

return getResp.Nodes, nil
}

+ 2
- 3
common/pkgs/cmd/create_package.go View File

@@ -12,7 +12,6 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -40,7 +39,7 @@ type ObjectUploadResult struct {
}

type UploadNodeInfo struct {
Node model.Node
Node cdssdk.Node
IsSameLocation bool
}

@@ -76,7 +75,7 @@ func (t *CreatePackage) Execute(ctx *UpdatePackageContext) (*CreatePackageResult
return nil, fmt.Errorf("getting user nodes: %w", err)
}

userNodes := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo {
return UploadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,


+ 1
- 2
common/pkgs/cmd/update_package.go View File

@@ -8,7 +8,6 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/iterator"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
@@ -50,7 +49,7 @@ func (t *UpdatePackage) Execute(ctx *UpdatePackageContext) (*UpdatePackageResult
return nil, fmt.Errorf("getting user nodes: %w", err)
}

userNodes := lo.Map(getUserNodesResp.Nodes, func(node model.Node, index int) UploadNodeInfo {
userNodes := lo.Map(getUserNodesResp.Nodes, func(node cdssdk.Node, index int) UploadNodeInfo {
return UploadNodeInfo{
Node: node,
IsSameLocation: node.LocationID == stgglb.Local.LocationID,


+ 4
- 4
common/pkgs/db/cache.go View File

@@ -74,8 +74,8 @@ func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes
}

// GetCachingFileNodes 查找缓存了指定文件的节点
func (*CacheDB) GetCachingFileNodes(ctx SQLContext, fileHash string) ([]model.Node, error) {
var x []model.Node
func (*CacheDB) GetCachingFileNodes(ctx SQLContext, fileHash string) ([]cdssdk.Node, error) {
var x []cdssdk.Node
err := sqlx.Select(ctx, &x,
"select Node.* from Cache, Node where Cache.FileHash=? and Cache.NodeID = Node.NodeID", fileHash)
return x, err
@@ -88,8 +88,8 @@ func (*CacheDB) DeleteNodeAll(ctx SQLContext, nodeID cdssdk.NodeID) error {
}

// FindCachingFileUserNodes 在缓存表中查询指定数据所在的节点
func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID cdssdk.NodeID, fileHash string) ([]model.Node, error) {
var x []model.Node
func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID cdssdk.NodeID, fileHash string) ([]cdssdk.Node, error) {
var x []cdssdk.Node
err := sqlx.Select(ctx, &x,
"select Node.* from Cache, UserNode, Node where"+
" Cache.FileHash=? and Cache.NodeID = UserNode.NodeID and"+


+ 0
- 12
common/pkgs/db/model/model.go View File

@@ -12,18 +12,6 @@ import (

// TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里

type Node struct {
NodeID cdssdk.NodeID `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"`
LocalIP string `db:"LocalIP" json:"localIP"`
ExternalIP string `db:"ExternalIP" json:"externalIP"`
LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"`
ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"`
LocationID cdssdk.LocationID `db:"LocationID" json:"locationID"`
State string `db:"State" json:"state"`
LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"`
}

type Storage struct {
StorageID cdssdk.StorageID `db:"StorageID" json:"storageID"`
Name string `db:"Name" json:"name"`


+ 6
- 7
common/pkgs/db/node.go View File

@@ -5,7 +5,6 @@ import (

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type NodeDB struct {
@@ -16,21 +15,21 @@ func (db *DB) Node() *NodeDB {
return &NodeDB{DB: db}
}

func (db *NodeDB) GetByID(ctx SQLContext, nodeID cdssdk.NodeID) (model.Node, error) {
var ret model.Node
func (db *NodeDB) GetByID(ctx SQLContext, nodeID cdssdk.NodeID) (cdssdk.Node, error) {
var ret cdssdk.Node
err := sqlx.Get(ctx, &ret, "select * from Node where NodeID = ?", nodeID)
return ret, err
}

func (db *NodeDB) GetAllNodes(ctx SQLContext) ([]model.Node, error) {
var ret []model.Node
func (db *NodeDB) GetAllNodes(ctx SQLContext) ([]cdssdk.Node, error) {
var ret []cdssdk.Node
err := sqlx.Select(ctx, &ret, "select * from Node")
return ret, err
}

// GetUserNodes 根据用户id查询可用node
func (db *NodeDB) GetUserNodes(ctx SQLContext, userID cdssdk.UserID) ([]model.Node, error) {
var nodes []model.Node
func (db *NodeDB) GetUserNodes(ctx SQLContext, userID cdssdk.UserID) ([]cdssdk.Node, error) {
var nodes []cdssdk.Node
err := sqlx.Select(ctx, &nodes, "select Node.* from UserNode, Node where UserNode.NodeID = Node.NodeID and UserNode.UserID=?", userID)
return nodes, err
}


+ 3
- 3
common/pkgs/ioswitch/ops/grpc.go View File

@@ -7,16 +7,16 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
myio "gitlink.org.cn/cloudream/common/utils/io"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type GRPCSend struct {
LocalID ioswitch.StreamID `json:"localID"`
RemoteID ioswitch.StreamID `json:"remoteID"`
Node model.Node `json:"node"`
Node cdssdk.Node `json:"node"`
}

func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
@@ -49,7 +49,7 @@ func (o *GRPCSend) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {
type GRPCFetch struct {
RemoteID ioswitch.StreamID `json:"remoteID"`
LocalID ioswitch.StreamID `json:"localID"`
Node model.Node `json:"node"`
Node cdssdk.Node `json:"node"`
}

func (o *GRPCFetch) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error {


+ 3
- 4
common/pkgs/ioswitch/plans/agent_plan.go View File

@@ -2,14 +2,13 @@ package plans

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/ops"
)

type AgentPlanBuilder struct {
owner *PlanBuilder
node model.Node
node cdssdk.Node
ops []ioswitch.Op
}

@@ -30,7 +29,7 @@ func (b *AgentPlanBuilder) Build(planID ioswitch.PlanID) (AgentPlan, error) {
}, nil
}

func (b *AgentPlanBuilder) GRCPFetch(node model.Node, str *AgentStream) *AgentStream {
func (b *AgentPlanBuilder) GRCPFetch(node cdssdk.Node, str *AgentStream) *AgentStream {
agtStr := &AgentStream{
owner: b,
info: b.owner.newStream(),
@@ -45,7 +44,7 @@ func (b *AgentPlanBuilder) GRCPFetch(node model.Node, str *AgentStream) *AgentSt
return agtStr
}

func (s *AgentStream) GRPCSend(node model.Node) *AgentStream {
func (s *AgentStream) GRPCSend(node cdssdk.Node) *AgentStream {
agtStr := &AgentStream{
owner: s.owner.owner.AtAgent(node),
info: s.owner.owner.newStream(),


+ 4
- 5
common/pkgs/ioswitch/plans/plan_builder.go View File

@@ -5,7 +5,6 @@ import (

"github.com/google/uuid"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

@@ -60,7 +59,7 @@ func (b *PlanBuilder) FromExecutor() *FromExecutorStream {
}
}

func (b *PlanBuilder) AtAgent(node model.Node) *AgentPlanBuilder {
func (b *PlanBuilder) AtAgent(node cdssdk.Node) *AgentPlanBuilder {
agtPlan, ok := b.agentPlans[node.NodeID]
if !ok {
agtPlan = &AgentPlanBuilder{
@@ -76,10 +75,10 @@ func (b *PlanBuilder) AtAgent(node model.Node) *AgentPlanBuilder {
type FromExecutorStream struct {
owner *PlanBuilder
info *StreamInfo
toNode *model.Node
toNode *cdssdk.Node
}

func (s *FromExecutorStream) ToNode(node model.Node) *AgentStream {
func (s *FromExecutorStream) ToNode(node cdssdk.Node) *AgentStream {
s.toNode = &node
return &AgentStream{
owner: s.owner.AtAgent(node),
@@ -89,7 +88,7 @@ func (s *FromExecutorStream) ToNode(node model.Node) *AgentStream {

type ToExecutorStream struct {
info *StreamInfo
fromNode *model.Node
fromNode *cdssdk.Node
}

type MultiStream struct {


+ 2
- 2
common/pkgs/ioswitch/plans/plans.go View File

@@ -1,12 +1,12 @@
package plans

import (
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
)

type AgentPlan struct {
Node model.Node
Node cdssdk.Node
Plan ioswitch.Plan
}



+ 6
- 6
common/pkgs/iterator/download_object_iterator.go View File

@@ -32,7 +32,7 @@ type IterDownloadingObject struct {
}

type DownloadNodeInfo struct {
Node model.Node
Node cdssdk.Node
ObjectPinned bool
Blocks []stgmod.ObjectBlock
Distance float64
@@ -242,7 +242,7 @@ func (iter *DownloadObjectIterator) sortDownloadNodes(coorCli *coormq.Client, ct
}

type downloadBlock struct {
Node model.Node
Node cdssdk.Node
Block stgmod.ObjectBlock
}

@@ -270,9 +270,9 @@ func (iter *DownloadObjectIterator) getMinReadingBlockSolution(sortedNodes []*Do
return math.MaxFloat64, gotBlocks
}

func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *model.Node) {
func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*DownloadNodeInfo, k int) (float64, *cdssdk.Node) {
dist := math.MaxFloat64
var downloadNode *model.Node
var downloadNode *cdssdk.Node
for _, n := range sortedNodes {
if n.ObjectPinned && float64(k)*n.Distance < dist {
dist = float64(k) * n.Distance
@@ -283,7 +283,7 @@ func (iter *DownloadObjectIterator) getMinReadingObjectSolution(sortedNodes []*D
return dist, downloadNode
}

func (iter *DownloadObjectIterator) getNodeDistance(node model.Node) float64 {
func (iter *DownloadObjectIterator) getNodeDistance(node cdssdk.Node) float64 {
if stgglb.Local.NodeID != nil {
if node.NodeID == *stgglb.Local.NodeID {
return consts.NodeDistanceSameNode
@@ -297,7 +297,7 @@ func (iter *DownloadObjectIterator) getNodeDistance(node model.Node) float64 {
return consts.NodeDistanceOther
}

func downloadFile(ctx *DownloadContext, node model.Node, fileHash string) (io.ReadCloser, error) {
func downloadFile(ctx *DownloadContext, node cdssdk.Node, fileHash string) (io.ReadCloser, error) {
// 如果客户端与节点在同一个地域,则使用内网地址连接节点
nodeIP := node.ExternalIP
grpcPort := node.ExternalGRPCPort


+ 5
- 6
common/pkgs/mq/coordinator/node.go View File

@@ -3,7 +3,6 @@ package coordinator
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type NodeService interface {
@@ -21,7 +20,7 @@ type GetUserNodes struct {
}
type GetUserNodesResp struct {
mq.MessageBodyBase
Nodes []model.Node `json:"nodes"`
Nodes []cdssdk.Node `json:"nodes"`
}

func NewGetUserNodes(userID cdssdk.UserID) *GetUserNodes {
@@ -29,7 +28,7 @@ func NewGetUserNodes(userID cdssdk.UserID) *GetUserNodes {
UserID: userID,
}
}
func NewGetUserNodesResp(nodes []model.Node) *GetUserNodesResp {
func NewGetUserNodesResp(nodes []cdssdk.Node) *GetUserNodesResp {
return &GetUserNodesResp{
Nodes: nodes,
}
@@ -47,7 +46,7 @@ type GetNodes struct {
}
type GetNodesResp struct {
mq.MessageBodyBase
Nodes []model.Node `json:"nodes"`
Nodes []cdssdk.Node `json:"nodes"`
}

func NewGetNodes(nodeIDs []cdssdk.NodeID) *GetNodes {
@@ -55,12 +54,12 @@ func NewGetNodes(nodeIDs []cdssdk.NodeID) *GetNodes {
NodeIDs: nodeIDs,
}
}
func NewGetNodesResp(nodes []model.Node) *GetNodesResp {
func NewGetNodesResp(nodes []cdssdk.Node) *GetNodesResp {
return &GetNodesResp{
Nodes: nodes,
}
}
func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *model.Node {
func (r *GetNodesResp) GetNode(id cdssdk.NodeID) *cdssdk.Node {
for _, n := range r.Nodes {
if n.NodeID == id {
return &n


+ 2
- 2
coordinator/internal/mq/node.go View File

@@ -4,7 +4,7 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

@@ -20,7 +20,7 @@ func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodes
}

func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.CodeMessage) {
var nodes []model.Node
var nodes []cdssdk.Node

if msg.NodeIDs == nil {
var err error


+ 1
- 2
scanner/internal/event/check_package_redundancy.go View File

@@ -11,7 +11,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/sort"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
@@ -36,7 +35,7 @@ func NewCheckPackageRedundancy(evt *scevt.CheckPackageRedundancy) *CheckPackageR
}

type NodeLoadInfo struct {
Node model.Node
Node cdssdk.Node
LoadsRecentMonth int
LoadsRecentYear int
}


+ 2
- 3
scanner/internal/event/clean_pinned.go View File

@@ -17,7 +17,6 @@ import (
"gitlink.org.cn/cloudream/storage/common/consts"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock/reqbuilder"
"gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch/plans"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
@@ -93,7 +92,7 @@ type doingContext struct {
execCtx ExecuteContext
readerNodeIDs []cdssdk.NodeID // 近期可能访问此对象的节点
nodesSortedByReader map[cdssdk.NodeID][]nodeDist // 拥有数据的节点到每个可能访问对象的节点按距离排序
nodeInfos map[cdssdk.NodeID]*model.Node
nodeInfos map[cdssdk.NodeID]*cdssdk.Node
blockList []objectBlock // 排序后的块分布情况
nodeBlockBitmaps map[cdssdk.NodeID]*bitmap.Bitmap64 // 用位图的形式表示每一个节点上有哪些块
allBlockTypeCount int // object总共被分成了几块
@@ -341,7 +340,7 @@ func (t *CleanPinned) doOne(execCtx ExecuteContext, readerNodeIDs []cdssdk.NodeI
execCtx: execCtx,
readerNodeIDs: readerNodeIDs,
nodesSortedByReader: make(map[cdssdk.NodeID][]nodeDist),
nodeInfos: make(map[cdssdk.NodeID]*model.Node),
nodeInfos: make(map[cdssdk.NodeID]*cdssdk.Node),
nodeBlockBitmaps: make(map[cdssdk.NodeID]*bitmap.Bitmap64),
}



+ 10
- 11
scanner/internal/event/event_test.go View File

@@ -6,7 +6,6 @@ import (
"github.com/samber/lo"
. "github.com/smartystreets/goconvey/convey"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

func Test_chooseSoManyNodes(t *testing.T) {
@@ -19,8 +18,8 @@ func Test_chooseSoManyNodes(t *testing.T) {
{
title: "节点数量充足",
allNodes: []*NodeLoadInfo{
{Node: model.Node{NodeID: cdssdk.NodeID(1)}},
{Node: model.Node{NodeID: cdssdk.NodeID(2)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(1)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(2)}},
},
count: 2,
expectedNodeIDs: []cdssdk.NodeID{1, 2},
@@ -28,9 +27,9 @@ func Test_chooseSoManyNodes(t *testing.T) {
{
title: "节点数量超过",
allNodes: []*NodeLoadInfo{
{Node: model.Node{NodeID: cdssdk.NodeID(1)}},
{Node: model.Node{NodeID: cdssdk.NodeID(2)}},
{Node: model.Node{NodeID: cdssdk.NodeID(3)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(1)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(2)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(3)}},
},
count: 2,
expectedNodeIDs: []cdssdk.NodeID{1, 2},
@@ -38,7 +37,7 @@ func Test_chooseSoManyNodes(t *testing.T) {
{
title: "只有一个节点,节点数量不够",
allNodes: []*NodeLoadInfo{
{Node: model.Node{NodeID: cdssdk.NodeID(1)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(1)}},
},
count: 3,
expectedNodeIDs: []cdssdk.NodeID{1, 1, 1},
@@ -46,8 +45,8 @@ func Test_chooseSoManyNodes(t *testing.T) {
{
title: "多个同地区节点,节点数量不够",
allNodes: []*NodeLoadInfo{
{Node: model.Node{NodeID: cdssdk.NodeID(1)}},
{Node: model.Node{NodeID: cdssdk.NodeID(2)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(1)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(2)}},
},
count: 5,
expectedNodeIDs: []cdssdk.NodeID{1, 1, 1, 2, 2},
@@ -55,8 +54,8 @@ func Test_chooseSoManyNodes(t *testing.T) {
{
title: "节点数量不够,且在不同地区",
allNodes: []*NodeLoadInfo{
{Node: model.Node{NodeID: cdssdk.NodeID(1), LocationID: cdssdk.LocationID(1)}},
{Node: model.Node{NodeID: cdssdk.NodeID(2), LocationID: cdssdk.LocationID(2)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(1), LocationID: cdssdk.LocationID(1)}},
{Node: cdssdk.Node{NodeID: cdssdk.NodeID(2), LocationID: cdssdk.LocationID(2)}},
},
count: 5,
expectedNodeIDs: []cdssdk.NodeID{1, 2, 1, 2, 1},


+ 1
- 2
scanner/internal/tickevent/batch_all_agent_check_cache.go View File

@@ -4,7 +4,6 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/event"
)
@@ -31,7 +30,7 @@ func (e *BatchAllAgentCheckCache) Execute(ctx ExecuteContext) {
return
}

e.nodeIDs = lo.Map(nodes, func(node model.Node, index int) cdssdk.NodeID { return node.NodeID })
e.nodeIDs = lo.Map(nodes, func(node cdssdk.Node, index int) cdssdk.NodeID { return node.NodeID })

log.Debugf("new check start, get all nodes")
}


Loading…
Cancel
Save