Browse Source

优化PostEvent的接口;client增加直接触发事件的接口

gitlink
Sydonian 2 years ago
parent
commit
5566b02941
3 changed files with 22 additions and 37 deletions
  1. +6
    -11
      internal/event/check_cache.go
  2. +3
    -6
      internal/event/check_state.go
  3. +13
    -20
      internal/event/check_storage.go

+ 6
- 11
internal/event/check_cache.go View File

@@ -11,7 +11,6 @@ import (
"gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/pkg/logger"
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
) )


@@ -103,15 +102,13 @@ func (t *CheckCache) checkIncrement(filesMap map[string]shell.PinInfo, execCtx E
// 增量情况下,不需要对filesMap中没检查的记录进行处理 // 增量情况下,不需要对filesMap中没检查的记录进行处理


if len(updateCacheOps) > 0 { if len(updateCacheOps) > 0 {
evtmsg, err := scmsg.NewPostEventBody(
err := execCtx.Args.Scanner.PostEvent(
scevt.NewUpdateCache(config.Cfg().ID, updateCacheOps), scevt.NewUpdateCache(config.Cfg().ID, updateCacheOps),
execCtx.Option.IsEmergency, execCtx.Option.IsEmergency,
execCtx.Option.DontMerge, execCtx.Option.DontMerge,
) )
if err == nil {
execCtx.Args.Scanner.PostEvent(evtmsg)
} else {
log.Warnf("new post event body failed, err: %s", err.Error())
if err != nil {
log.Warnf("post event to scanner failed, err: %s", err.Error())
} }
} }
} }
@@ -157,15 +154,13 @@ func (t *CheckCache) checkComplete(filesMap map[string]shell.PinInfo, execCtx Ex
updateCacheOps = append(updateCacheOps, scevt.NewUpdateCacheEntry(hash, evcst.UPDATE_CACHE_CREATE_TEMP)) updateCacheOps = append(updateCacheOps, scevt.NewUpdateCacheEntry(hash, evcst.UPDATE_CACHE_CREATE_TEMP))
} }


evtmsg, err := scmsg.NewPostEventBody(
err := execCtx.Args.Scanner.PostEvent(
scevt.NewUpdateCache(config.Cfg().ID, updateCacheOps), scevt.NewUpdateCache(config.Cfg().ID, updateCacheOps),
execCtx.Option.IsEmergency, execCtx.Option.IsEmergency,
execCtx.Option.DontMerge, execCtx.Option.DontMerge,
) )
if err == nil {
execCtx.Args.Scanner.PostEvent(evtmsg)
} else {
log.Warnf("new post event body failed, err: %s", err.Error())
if err != nil {
log.Warnf("post event to scanner failed, err: %s", err.Error())
} }
} }




+ 3
- 6
internal/event/check_state.go View File

@@ -5,7 +5,6 @@ import (
"gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/pkg/logger"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
) )


@@ -32,11 +31,9 @@ func (t *CheckState) Execute(execCtx ExecuteContext) {
} }


// 紧急任务 // 紧急任务
evtmsg, err := scmsg.NewPostEventBody(scevt.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true)
if err == nil {
execCtx.Args.Scanner.PostEvent(evtmsg)
} else {
log.Warnf("new post event body failed, err: %s", err.Error())
err := execCtx.Args.Scanner.PostEvent(scevt.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true)
if err != nil {
log.Warnf("post event to scanner failed, err: %s", err.Error())
} }
} }




+ 13
- 20
internal/event/check_storage.go View File

@@ -13,7 +13,6 @@ import (
"gitlink.org.cn/cloudream/common/utils" "gitlink.org.cn/cloudream/common/utils"
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
) )


@@ -62,15 +61,12 @@ func (t *CheckStorage) Execute(execCtx ExecuteContext) {
if err != nil { if err != nil {
log.Warnf("list storage directory failed, err: %s", err.Error()) log.Warnf("list storage directory failed, err: %s", err.Error())


evtmsg, err := scmsg.NewPostEventBody(scevt.NewUpdateStorage(
t.StorageID,
err.Error(),
nil,
), execCtx.Option.IsEmergency, execCtx.Option.DontMerge)
if err == nil {
execCtx.Args.Scanner.PostEvent(evtmsg)
} else {
log.Warnf("new post event body failed, err: %s", err.Error())
err := execCtx.Args.Scanner.PostEvent(scevt.NewUpdateStorage(t.StorageID, err.Error(), nil),
execCtx.Option.IsEmergency,
execCtx.Option.DontMerge,
)
if err != nil {
log.Warnf("post event to scanner failed, err: %s", err.Error())
} }
return return
} }
@@ -109,15 +105,13 @@ func (t *CheckStorage) checkIncrement(fileInfos []fs.FileInfo, execCtx ExecuteCo
} }


// 增量情况下,不需要对infosMap中没检查的记录进行处理 // 增量情况下,不需要对infosMap中没检查的记录进行处理
evtmsg, err := scmsg.NewPostEventBody(
err := execCtx.Args.Scanner.PostEvent(
scevt.NewUpdateStorage(t.StorageID, consts.STORAGE_DIRECTORY_STATUS_OK, updateStorageOps), scevt.NewUpdateStorage(t.StorageID, consts.STORAGE_DIRECTORY_STATUS_OK, updateStorageOps),
execCtx.Option.IsEmergency, execCtx.Option.IsEmergency,
execCtx.Option.DontMerge, execCtx.Option.DontMerge,
) )
if err == nil {
execCtx.Args.Scanner.PostEvent(evtmsg)
} else {
log.Warnf("new post event body failed, err: %s", err.Error())
if err != nil {
log.Warnf("post event to scanner failed, err: %s", err.Error())
} }
} }


@@ -146,15 +140,14 @@ func (t *CheckStorage) checkComplete(fileInfos []fs.FileInfo, execCtx ExecuteCon
} }


// Storage中多出来的文件不做处理 // Storage中多出来的文件不做处理
evtmsg, err := scmsg.NewPostEventBody(

err := execCtx.Args.Scanner.PostEvent(
scevt.NewUpdateStorage(t.StorageID, consts.STORAGE_DIRECTORY_STATUS_OK, updateStorageOps), scevt.NewUpdateStorage(t.StorageID, consts.STORAGE_DIRECTORY_STATUS_OK, updateStorageOps),
execCtx.Option.IsEmergency, execCtx.Option.IsEmergency,
execCtx.Option.DontMerge, execCtx.Option.DontMerge,
) )
if err == nil {
execCtx.Args.Scanner.PostEvent(evtmsg)
} else {
log.Warnf("new post event body failed, err: %s", err.Error())
if err != nil {
log.Warnf("post event to scanner failed, err: %s", err.Error())
} }
} }




Loading…
Cancel
Save