diff --git a/internal/task/agent_check_cache.go b/internal/task/agent_check_cache.go index 2768acb..a80e096 100644 --- a/internal/task/agent_check_cache.go +++ b/internal/task/agent_check_cache.go @@ -78,7 +78,7 @@ func (t *AgentCheckCacheTask) checkOneAgentCache(entry AgentCheckCacheTaskEntry, var isComplete bool var caches []model.Cache - err := execCtx.MyDB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { // TODO unavailable的节点需不需要发送任务? if entry.FileHashes == nil { diff --git a/internal/task/check_rep_count.go b/internal/task/check_rep_count.go index 4c18683..eaa36ac 100644 --- a/internal/task/check_rep_count.go +++ b/internal/task/check_rep_count.go @@ -7,8 +7,8 @@ import ( "github.com/jmoiron/sqlx" "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/scanner/internal/config" - "gitlink.org.cn/cloudream/utils/consts" log "gitlink.org.cn/cloudream/utils/logger" mymath "gitlink.org.cn/cloudream/utils/math" mysort "gitlink.org.cn/cloudream/utils/sort" @@ -32,7 +32,7 @@ func (t *CheckRepCountTask) TryMerge(other Task) bool { } func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOption) { - var updatedNodeAndHashes map[int][]string + updatedNodeAndHashes := make(map[int][]string) for _, fileHash := range t.FileHashes { updatedNodeIDs, err := t.checkOneRepCount(fileHash, execCtx) @@ -57,7 +57,7 @@ func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOptio func (t *CheckRepCountTask) checkOneRepCount(fileHash string, execCtx *ExecuteContext) ([]int, error) { var updatedNodeIDs []int - err := execCtx.MyDB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { repMaxCnt, err := mysql.ObjectRep.GetFileMaxRepCount(tx, fileHash) if err != nil { return fmt.Errorf("get file max rep count failed, err: %w", err) diff --git a/internal/task/executor.go b/internal/task/executor.go index aa93e23..35f867f 100644 --- a/internal/task/executor.go +++ b/internal/task/executor.go @@ -15,7 +15,7 @@ type ExecuteOption struct { } type ExecuteContext struct { Executor *Executor - MyDB *mydb.DB + DB *mydb.DB } type postedTask struct { Task Task diff --git a/internal/task/update_cache.go b/internal/task/update_cache.go new file mode 100644 index 0000000..e54a0ea --- /dev/null +++ b/internal/task/update_cache.go @@ -0,0 +1,57 @@ +package task + +import ( + tskcst "gitlink.org.cn/cloudream/common/consts/task" + mysql "gitlink.org.cn/cloudream/db/sql" + "gitlink.org.cn/cloudream/utils/logger" +) + +type UpdateCacheTaskEntry struct { + FileHash string + NodeID int + Operation string +} + +func NewUpdateCacheTaskEntry(fileHash string, nodeID int, op string) UpdateCacheTaskEntry { + return UpdateCacheTaskEntry{ + FileHash: fileHash, + NodeID: nodeID, + Operation: op, + } +} + +type UpdateCacheTask struct { + Entries []UpdateCacheTaskEntry +} + +func NewUpdateCacheTask(entries []UpdateCacheTaskEntry) UpdateCacheTask { + return UpdateCacheTask{ + Entries: entries, + } +} + +func (t *UpdateCacheTask) TryMerge(other Task) bool { + chkTask, ok := other.(*UpdateCacheTask) + if !ok { + return false + } + + // TODO 可以考虑合并同FileHash和NodeID的记录 + t.Entries = append(t.Entries, chkTask.Entries...) + return true +} + +func (t *UpdateCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { + for _, entry := range t.Entries { + switch entry.Operation { + case tskcst.UPDATE_CACHE_OP_UNTEMP: + err := mysql.Cache.DeleteTemp(execCtx.DB.SQLCtx(), entry.FileHash, entry.NodeID) + + if err != nil { + logger.WithField("FileHash", entry.FileHash). + WithField("NodeID", entry.NodeID). + Warnf("delete temp cache failed, err: %s", err.Error()) + } + } + } +}