From beaa925a6263bb9b49d95d957bc9fe25a52c1524 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 14 Jun 2023 16:21:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=89=E5=8F=8A=E5=88=B0=E5=A4=9A=E4=B8=AA?= =?UTF-8?q?=E8=A1=A8=E7=9A=84=E5=85=83=E6=95=B0=E6=8D=AE=E6=94=B9=E5=8A=A8?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=8B=E5=8A=A1=EF=BC=8C=E4=BF=9D=E8=AF=81?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=AE=8C=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/check_rep_count.go | 119 ++++++++++++++---------------- 1 file changed, 57 insertions(+), 62 deletions(-) diff --git a/internal/event/check_rep_count.go b/internal/event/check_rep_count.go index cdbecae..d7bc395 100644 --- a/internal/event/check_rep_count.go +++ b/internal/event/check_rep_count.go @@ -1,11 +1,9 @@ package event import ( - "database/sql" "fmt" "math" - "github.com/jmoiron/sqlx" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" @@ -82,84 +80,81 @@ func (t *CheckRepCount) Execute(execCtx ExecuteContext) { func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext) ([]int, error) { log := logger.WithType[CheckRepCount]("Event") + sqlCtx := execCtx.Args.DB.SQLCtx() var updatedNodeIDs []int - err := execCtx.Args.DB.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { - // 计算所需的最少备份数: - // 1. ObjectRep中期望备份数的最大值 - // 2. 如果ObjectBlock存在对此文件的引用,则至少为1 - repMaxCnt, err := execCtx.Args.DB.ObjectRep().GetFileMaxRepCount(tx, fileHash) - if err != nil { - return fmt.Errorf("get file max rep count failed, err: %w", err) - } + // 计算所需的最少备份数: + // 1. ObjectRep中期望备份数的最大值 + // 2. 如果ObjectBlock存在对此文件的引用,则至少为1 + repMaxCnt, err := execCtx.Args.DB.ObjectRep().GetFileMaxRepCount(sqlCtx, fileHash) + if err != nil { + return nil, fmt.Errorf("get file max rep count failed, err: %w", err) + } - blkCnt, err := execCtx.Args.DB.ObjectBlock().CountBlockWithHash(tx, fileHash) - if err != nil { - return fmt.Errorf("count block with hash failed, err: %w", err) - } + blkCnt, err := execCtx.Args.DB.ObjectBlock().CountBlockWithHash(sqlCtx, fileHash) + if err != nil { + return nil, fmt.Errorf("count block with hash failed, err: %w", err) + } - needRepCount := mymath.Max(repMaxCnt, mymath.Min(1, blkCnt)) + needRepCount := mymath.Max(repMaxCnt, mymath.Min(1, blkCnt)) - repNodes, err := execCtx.Args.DB.Cache().GetCachingFileNodes(tx, fileHash) - if err != nil { - return fmt.Errorf("get caching file nodes failed, err: %w", err) - } + repNodes, err := execCtx.Args.DB.Cache().GetCachingFileNodes(sqlCtx, fileHash) + if err != nil { + return nil, fmt.Errorf("get caching file nodes failed, err: %w", err) + } - allNodes, err := execCtx.Args.DB.Node().GetAllNodes(tx) - if err != nil { - return fmt.Errorf("get all nodes failed, err: %w", err) - } + allNodes, err := execCtx.Args.DB.Node().GetAllNodes(sqlCtx) + if err != nil { + return nil, fmt.Errorf("get all nodes failed, err: %w", err) + } - var normalNodes, unavaiNodes []model.Node - for _, node := range repNodes { - if node.State == consts.NODE_STATE_NORMAL { - normalNodes = append(normalNodes, node) - } else if node.State == consts.NODE_STATE_UNAVAILABLE { - unavaiNodes = append(unavaiNodes, node) - } + var normalNodes, unavaiNodes []model.Node + for _, node := range repNodes { + if node.State == consts.NODE_STATE_NORMAL { + normalNodes = append(normalNodes, node) + } else if node.State == consts.NODE_STATE_UNAVAILABLE { + unavaiNodes = append(unavaiNodes, node) } + } - // 如果Available的备份数超过期望备份数,则让一些节点退出 - if len(normalNodes) > needRepCount { - delNodes := chooseDeleteAvaiRepNodes(allNodes, normalNodes, len(normalNodes)-needRepCount) - for _, node := range delNodes { - err := execCtx.Args.DB.Cache().SetTemp(tx, fileHash, node.NodeID) - if err != nil { - return fmt.Errorf("change cache state failed, err: %w", err) - } - updatedNodeIDs = append(updatedNodeIDs, node.NodeID) + // 如果Available的备份数超过期望备份数,则让一些节点退出 + if len(normalNodes) > needRepCount { + delNodes := chooseDeleteAvaiRepNodes(allNodes, normalNodes, len(normalNodes)-needRepCount) + for _, node := range delNodes { + err := execCtx.Args.DB.Cache().SetTemp(sqlCtx, fileHash, node.NodeID) + if err != nil { + return nil, fmt.Errorf("change cache state failed, err: %w", err) } - return nil + updatedNodeIDs = append(updatedNodeIDs, node.NodeID) } + return updatedNodeIDs, nil + } - // 因为总备份数不够,而需要增加的备份数 - add1 := mymath.Max(0, needRepCount-len(repNodes)) + // 因为总备份数不够,而需要增加的备份数 + add1 := mymath.Max(0, needRepCount-len(repNodes)) - // 因为Available的备份数占比过少,而需要增加的备份数 - minAvaiNodeCnt := int(math.Ceil(float64(config.Cfg().MinAvailableRepProportion) * float64(needRepCount))) - add2 := mymath.Max(0, minAvaiNodeCnt-len(normalNodes)) + // 因为Available的备份数占比过少,而需要增加的备份数 + minAvaiNodeCnt := int(math.Ceil(float64(config.Cfg().MinAvailableRepProportion) * float64(needRepCount))) + add2 := mymath.Max(0, minAvaiNodeCnt-len(normalNodes)) - // 最终需要增加的备份数,是以上两种情况的最大值 - finalAddCount := mymath.Max(add1, add2) + // 最终需要增加的备份数,是以上两种情况的最大值 + finalAddCount := mymath.Max(add1, add2) - if finalAddCount > 0 { - newNodes := chooseNewRepNodes(allNodes, repNodes, finalAddCount) - if len(newNodes) < finalAddCount { - log.WithField("FileHash", fileHash).Warnf("need %d more rep nodes, but get only %d nodes", finalAddCount, len(newNodes)) - // TODO 节点数不够,进行一个告警 - } + if finalAddCount > 0 { + newNodes := chooseNewRepNodes(allNodes, repNodes, finalAddCount) + if len(newNodes) < finalAddCount { + log.WithField("FileHash", fileHash).Warnf("need %d more rep nodes, but get only %d nodes", finalAddCount, len(newNodes)) + // TODO 节点数不够,进行一个告警 + } - for _, node := range newNodes { - err := execCtx.Args.DB.Cache().CreatePinned(tx, fileHash, node.NodeID) - if err != nil { - return fmt.Errorf("create cache failed, err: %w", err) - } - updatedNodeIDs = append(updatedNodeIDs, node.NodeID) + for _, node := range newNodes { + err := execCtx.Args.DB.Cache().CreatePinned(sqlCtx, fileHash, node.NodeID) + if err != nil { + return nil, fmt.Errorf("create cache failed, err: %w", err) } + updatedNodeIDs = append(updatedNodeIDs, node.NodeID) } - - return nil - }) + } return updatedNodeIDs, err }