Browse Source

涉及到多个表的元数据改动增加事务,保证数据完整

gitlink
Sydonian 2 years ago
parent
commit
beaa925a62
1 changed files with 57 additions and 62 deletions
  1. +57
    -62
      internal/event/check_rep_count.go

+ 57
- 62
internal/event/check_rep_count.go View File

@@ -1,11 +1,9 @@
package event

import (
"database/sql"
"fmt"
"math"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder"
@@ -82,84 +80,81 @@ func (t *CheckRepCount) Execute(execCtx ExecuteContext) {

func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext) ([]int, error) {
log := logger.WithType[CheckRepCount]("Event")
sqlCtx := execCtx.Args.DB.SQLCtx()

var updatedNodeIDs []int
err := execCtx.Args.DB.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
// 计算所需的最少备份数:
// 1. ObjectRep中期望备份数的最大值
// 2. 如果ObjectBlock存在对此文件的引用,则至少为1
repMaxCnt, err := execCtx.Args.DB.ObjectRep().GetFileMaxRepCount(tx, fileHash)
if err != nil {
return fmt.Errorf("get file max rep count failed, err: %w", err)
}
// 计算所需的最少备份数:
// 1. ObjectRep中期望备份数的最大值
// 2. 如果ObjectBlock存在对此文件的引用,则至少为1
repMaxCnt, err := execCtx.Args.DB.ObjectRep().GetFileMaxRepCount(sqlCtx, fileHash)
if err != nil {
return nil, fmt.Errorf("get file max rep count failed, err: %w", err)
}

blkCnt, err := execCtx.Args.DB.ObjectBlock().CountBlockWithHash(tx, fileHash)
if err != nil {
return fmt.Errorf("count block with hash failed, err: %w", err)
}
blkCnt, err := execCtx.Args.DB.ObjectBlock().CountBlockWithHash(sqlCtx, fileHash)
if err != nil {
return nil, fmt.Errorf("count block with hash failed, err: %w", err)
}

needRepCount := mymath.Max(repMaxCnt, mymath.Min(1, blkCnt))
needRepCount := mymath.Max(repMaxCnt, mymath.Min(1, blkCnt))

repNodes, err := execCtx.Args.DB.Cache().GetCachingFileNodes(tx, fileHash)
if err != nil {
return fmt.Errorf("get caching file nodes failed, err: %w", err)
}
repNodes, err := execCtx.Args.DB.Cache().GetCachingFileNodes(sqlCtx, fileHash)
if err != nil {
return nil, fmt.Errorf("get caching file nodes failed, err: %w", err)
}

allNodes, err := execCtx.Args.DB.Node().GetAllNodes(tx)
if err != nil {
return fmt.Errorf("get all nodes failed, err: %w", err)
}
allNodes, err := execCtx.Args.DB.Node().GetAllNodes(sqlCtx)
if err != nil {
return nil, fmt.Errorf("get all nodes failed, err: %w", err)
}

var normalNodes, unavaiNodes []model.Node
for _, node := range repNodes {
if node.State == consts.NODE_STATE_NORMAL {
normalNodes = append(normalNodes, node)
} else if node.State == consts.NODE_STATE_UNAVAILABLE {
unavaiNodes = append(unavaiNodes, node)
}
var normalNodes, unavaiNodes []model.Node
for _, node := range repNodes {
if node.State == consts.NODE_STATE_NORMAL {
normalNodes = append(normalNodes, node)
} else if node.State == consts.NODE_STATE_UNAVAILABLE {
unavaiNodes = append(unavaiNodes, node)
}
}

// 如果Available的备份数超过期望备份数,则让一些节点退出
if len(normalNodes) > needRepCount {
delNodes := chooseDeleteAvaiRepNodes(allNodes, normalNodes, len(normalNodes)-needRepCount)
for _, node := range delNodes {
err := execCtx.Args.DB.Cache().SetTemp(tx, fileHash, node.NodeID)
if err != nil {
return fmt.Errorf("change cache state failed, err: %w", err)
}
updatedNodeIDs = append(updatedNodeIDs, node.NodeID)
// 如果Available的备份数超过期望备份数,则让一些节点退出
if len(normalNodes) > needRepCount {
delNodes := chooseDeleteAvaiRepNodes(allNodes, normalNodes, len(normalNodes)-needRepCount)
for _, node := range delNodes {
err := execCtx.Args.DB.Cache().SetTemp(sqlCtx, fileHash, node.NodeID)
if err != nil {
return nil, fmt.Errorf("change cache state failed, err: %w", err)
}
return nil
updatedNodeIDs = append(updatedNodeIDs, node.NodeID)
}
return updatedNodeIDs, nil
}

// 因为总备份数不够,而需要增加的备份数
add1 := mymath.Max(0, needRepCount-len(repNodes))
// 因为总备份数不够,而需要增加的备份数
add1 := mymath.Max(0, needRepCount-len(repNodes))

// 因为Available的备份数占比过少,而需要增加的备份数
minAvaiNodeCnt := int(math.Ceil(float64(config.Cfg().MinAvailableRepProportion) * float64(needRepCount)))
add2 := mymath.Max(0, minAvaiNodeCnt-len(normalNodes))
// 因为Available的备份数占比过少,而需要增加的备份数
minAvaiNodeCnt := int(math.Ceil(float64(config.Cfg().MinAvailableRepProportion) * float64(needRepCount)))
add2 := mymath.Max(0, minAvaiNodeCnt-len(normalNodes))

// 最终需要增加的备份数,是以上两种情况的最大值
finalAddCount := mymath.Max(add1, add2)
// 最终需要增加的备份数,是以上两种情况的最大值
finalAddCount := mymath.Max(add1, add2)

if finalAddCount > 0 {
newNodes := chooseNewRepNodes(allNodes, repNodes, finalAddCount)
if len(newNodes) < finalAddCount {
log.WithField("FileHash", fileHash).Warnf("need %d more rep nodes, but get only %d nodes", finalAddCount, len(newNodes))
// TODO 节点数不够,进行一个告警
}
if finalAddCount > 0 {
newNodes := chooseNewRepNodes(allNodes, repNodes, finalAddCount)
if len(newNodes) < finalAddCount {
log.WithField("FileHash", fileHash).Warnf("need %d more rep nodes, but get only %d nodes", finalAddCount, len(newNodes))
// TODO 节点数不够,进行一个告警
}

for _, node := range newNodes {
err := execCtx.Args.DB.Cache().CreatePinned(tx, fileHash, node.NodeID)
if err != nil {
return fmt.Errorf("create cache failed, err: %w", err)
}
updatedNodeIDs = append(updatedNodeIDs, node.NodeID)
for _, node := range newNodes {
err := execCtx.Args.DB.Cache().CreatePinned(sqlCtx, fileHash, node.NodeID)
if err != nil {
return nil, fmt.Errorf("create cache failed, err: %w", err)
}
updatedNodeIDs = append(updatedNodeIDs, node.NodeID)
}

return nil
})
}

return updatedNodeIDs, err
}


Loading…
Cancel
Save