diff --git a/internal/event/agent_check_cache.go b/internal/event/agent_check_cache.go index 458f4c1..9c30cd1 100644 --- a/internal/event/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -6,7 +6,6 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" - mysql "gitlink.org.cn/cloudream/db/sql" "gitlink.org.cn/cloudream/scanner/internal/config" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" @@ -57,7 +56,7 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { if t.FileHashes == nil { var err error - caches, err = mysql.Cache.GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) + caches, err = execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) return @@ -66,7 +65,7 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { } else { for _, hash := range t.FileHashes { - ch, err := mysql.Cache.Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID) + ch, err := execCtx.Args.DB.Cache().Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID) // 记录不存在则跳过 if err == sql.ErrNoRows { continue diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index fcefd84..c5f38db 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -61,7 +61,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - caches, err := mysql.Cache.GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) + caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) return diff --git a/internal/event/check_cache.go b/internal/event/check_cache.go index 2760d02..0c5a839 100644 --- a/internal/event/check_cache.go +++ b/internal/event/check_cache.go @@ -52,12 +52,12 @@ func (t *CheckCache) Execute(execCtx ExecuteContext) { return nil } - caches, err := mysql.Cache.GetNodeCaches(tx, t.NodeID) + caches, err := execCtx.Args.DB.Cache().GetNodeCaches(tx, t.NodeID) if err != nil { return fmt.Errorf("get node caches failed, err: %w", err) } - err = mysql.Cache.DeleteNodeAll(tx, t.NodeID) + err = execCtx.Args.DB.Cache().DeleteNodeAll(tx, t.NodeID) if err != nil { return fmt.Errorf("delete node all caches failed, err: %w", err) } diff --git a/internal/event/check_rep_count.go b/internal/event/check_rep_count.go index ffdc37e..cf6cb62 100644 --- a/internal/event/check_rep_count.go +++ b/internal/event/check_rep_count.go @@ -81,9 +81,9 @@ func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext // 计算所需的最少备份数: // ObjectRep中期望备份数的最大值 // 如果ObjectBlock存在对此文件的引用,则至少为1 - needRepCount := mymath.Max(repMaxCnt, mymath.Max(1, blkCnt)) + needRepCount := mymath.Max(repMaxCnt, mymath.Min(1, blkCnt)) - repNodes, err := mysql.Cache.GetCachingFileNodes(tx, fileHash) + repNodes, err := execCtx.Args.DB.Cache().GetCachingFileNodes(tx, fileHash) if err != nil { return fmt.Errorf("get caching file nodes failed, err: %w", err) } @@ -106,7 +106,7 @@ func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext if len(normalNodes) > needRepCount { delNodes := chooseDeleteAvaiRepNodes(allNodes, normalNodes, len(normalNodes)-needRepCount) for _, node := range delNodes { - err := mysql.Cache.ChangeState(tx, fileHash, node.NodeID, consts.CACHE_STATE_TEMP) + err := execCtx.Args.DB.Cache().SetTemp(tx, fileHash, node.NodeID) if err != nil { return fmt.Errorf("change cache state failed, err: %w", err) } @@ -134,7 +134,7 @@ func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext } for _, node := range newNodes { - err := mysql.Cache.CreatePinned(tx, fileHash, node.NodeID) + err := execCtx.Args.DB.Cache().CreatePinned(tx, fileHash, node.NodeID) if err != nil { return fmt.Errorf("create cache failed, err: %w", err) } diff --git a/internal/event/update_cache.go b/internal/event/update_cache.go index a72fa3f..173838c 100644 --- a/internal/event/update_cache.go +++ b/internal/event/update_cache.go @@ -3,7 +3,6 @@ package event import ( evtcst "gitlink.org.cn/cloudream/common/consts/event" "gitlink.org.cn/cloudream/common/pkg/logger" - mysql "gitlink.org.cn/cloudream/db/sql" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" ) @@ -40,7 +39,7 @@ func (t *UpdateCache) Execute(execCtx ExecuteContext) { for _, entry := range t.Entries { switch entry.Operation { case evtcst.UPDATE_CACHE_DELETE_TEMP: - err := mysql.Cache.DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) + err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) if err != nil { log.WithField("FileHash", entry.FileHash). WithField("NodeID", t.NodeID). @@ -52,7 +51,7 @@ func (t *UpdateCache) Execute(execCtx ExecuteContext) { Debugf("delete temp cache") case evtcst.UPDATE_CACHE_CREATE_TEMP: - err := mysql.Cache.CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) + err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) if err != nil { log.WithField("FileHash", entry.FileHash). WithField("NodeID", t.NodeID). diff --git a/internal/tickevent/batch_check_all_rep_count.go b/internal/tickevent/batch_check_all_rep_count.go index 07f476d..cdb6902 100644 --- a/internal/tickevent/batch_check_all_rep_count.go +++ b/internal/tickevent/batch_check_all_rep_count.go @@ -2,7 +2,6 @@ package tickevent import ( "gitlink.org.cn/cloudream/common/pkg/logger" - mysql "gitlink.org.cn/cloudream/db/sql" "gitlink.org.cn/cloudream/scanner/internal/event" ) @@ -20,7 +19,7 @@ func (e *BatchCheckAllRepCount) Execute(ctx ExecuteContext) { log := logger.WithType[BatchCheckAllRepCount]("TickEvent") log.Debugf("begin") - fileHashes, err := mysql.Cache.BatchGetAllFileHashes(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CHECK_CACHE_BATCH_SIZE) + fileHashes, err := ctx.Args.DB.Cache().BatchGetAllFileHashes(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CHECK_CACHE_BATCH_SIZE) if err != nil { log.Warnf("batch get file hashes failed, err: %s", err.Error()) return