diff --git a/internal/event/agent_check_cache.go b/internal/event/agent_check_cache.go index becdbcd..e038f6e 100644 --- a/internal/event/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -120,7 +120,7 @@ func (t *AgentCheckCache) checkIncrement(execCtx ExecuteContext) { caches = append(caches, ch) } - t.startCheck(execCtx, true, caches) + t.startCheck(execCtx, false, caches) } func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, caches []model.Cache) { diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index 3b6ea1b..5f6137e 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -61,30 +61,6 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - if node.State != consts.NODE_STATE_NORMAL { - return - } - - // 检查上次上报时间,超时的设置为不可用 - // TODO 没有上报过是否要特殊处理? - if node.LastReportTime != nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second { - err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error()) - return - } - - caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) - return - } - - // 补充备份数 - execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash }))) - return - } - agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) @@ -92,9 +68,30 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } defer agentClient.Close() - getResp, err := agentClient.GetState(agtmsg.NewGetState(), rabbitmq.RequestOption{Timeout: time.Minute}) + getResp, err := agentClient.GetState(agtmsg.NewGetState(), rabbitmq.RequestOption{Timeout: time.Second * 30}) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) + + // 检查上次上报时间,超时的设置为不可用 + // TODO 没有上报过是否要特殊处理? + if node.LastReportTime != nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second { + err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error()) + return + } + + caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) + if err != nil { + log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) + return + } + + // 补充备份数 + execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash }))) + return + } + return } // 根据返回结果修改节点状态 diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index 3b0c01f..13166f3 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -154,6 +154,7 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorage(stg.StorageID, stg.Directory, isComplete, objects), rabbitmq.RequestOption{Timeout: time.Minute}) if err != nil { log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) + return } // 根据返回结果修改数据库