From 962dc084dc5af08349786841530f23ecbcb6fa32 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 26 May 2023 09:48:34 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=A2=9E=E5=8A=A0=E8=B0=83?= =?UTF-8?q?=E8=AF=95=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/check_cache.go | 21 +++++++++++++-------- internal/event/check_state.go | 5 +++-- internal/event/check_storage.go | 19 ++++++++++++++----- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/internal/event/check_cache.go b/internal/event/check_cache.go index 7ccc316..090298d 100644 --- a/internal/event/check_cache.go +++ b/internal/event/check_cache.go @@ -47,11 +47,12 @@ func (t *CheckCache) TryMerge(other Event) bool { } func (t *CheckCache) Execute(execCtx ExecuteContext) { - logger.Debugf("begin check cache") + log := logger.WithType[CheckCache]("Event") + log.Debugf("begin with %v", logger.FormatStruct(t)) filesMap, err := execCtx.Args.IPFS.GetPinnedFiles() if err != nil { - logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) + log.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) return } @@ -64,6 +65,8 @@ func (t *CheckCache) Execute(execCtx ExecuteContext) { } func (t *CheckCache) checkIncrement(filesMap map[string]shell.PinInfo, execCtx ExecuteContext) { + log := logger.WithType[CheckCache]("Event") + var updateCacheOps []scevt.UpdateCacheEntry for _, cache := range t.Caches { @@ -74,7 +77,7 @@ func (t *CheckCache) checkIncrement(filesMap map[string]shell.PinInfo, execCtx E } else if cache.State == consts.CACHE_STATE_TEMP { err := execCtx.Args.IPFS.Unpin(cache.FileHash) if err != nil { - logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) + log.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } } @@ -86,7 +89,7 @@ func (t *CheckCache) checkIncrement(filesMap map[string]shell.PinInfo, execCtx E // TODO 需要考虑此处是否是同步的过程 err := execCtx.Args.IPFS.Pin(cache.FileHash) if err != nil { - logger.WithField("FileHash", cache.FileHash).Warnf("pin file failed, err: %s", err.Error()) + log.WithField("FileHash", cache.FileHash).Warnf("pin file failed, err: %s", err.Error()) } } else if cache.State == consts.CACHE_STATE_TEMP { @@ -108,12 +111,14 @@ func (t *CheckCache) checkIncrement(filesMap map[string]shell.PinInfo, execCtx E if err == nil { execCtx.Args.Scanner.PostEvent(evtmsg) } else { - logger.Warnf("new post event body failed, err: %s", err.Error()) + log.Warnf("new post event body failed, err: %s", err.Error()) } } } func (t *CheckCache) checkComplete(filesMap map[string]shell.PinInfo, execCtx ExecuteContext) { + log := logger.WithType[CheckCache]("Event") + var updateCacheOps []scevt.UpdateCacheEntry for _, cache := range t.Caches { @@ -124,7 +129,7 @@ func (t *CheckCache) checkComplete(filesMap map[string]shell.PinInfo, execCtx Ex } else if cache.State == consts.CACHE_STATE_TEMP { err := execCtx.Args.IPFS.Unpin(cache.FileHash) if err != nil { - logger.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) + log.WithField("FileHash", cache.FileHash).Warnf("unpin file failed, err: %s", err.Error()) } } @@ -136,7 +141,7 @@ func (t *CheckCache) checkComplete(filesMap map[string]shell.PinInfo, execCtx Ex // TODO 需要考虑此处是否是同步的过程 err := execCtx.Args.IPFS.Pin(cache.FileHash) if err != nil { - logger.WithField("FileHash", cache.FileHash).Warnf("pin file failed, err: %s", err.Error()) + log.WithField("FileHash", cache.FileHash).Warnf("pin file failed, err: %s", err.Error()) } } else if cache.State == consts.CACHE_STATE_TEMP { @@ -160,7 +165,7 @@ func (t *CheckCache) checkComplete(filesMap map[string]shell.PinInfo, execCtx Ex if err == nil { execCtx.Args.Scanner.PostEvent(evtmsg) } else { - logger.Warnf("new post event body failed, err: %s", err.Error()) + log.Warnf("new post event body failed, err: %s", err.Error()) } } diff --git a/internal/event/check_state.go b/internal/event/check_state.go index f60989c..7e23f85 100644 --- a/internal/event/check_state.go +++ b/internal/event/check_state.go @@ -22,7 +22,8 @@ func (t *CheckState) TryMerge(other Event) bool { } func (t *CheckState) Execute(execCtx ExecuteContext) { - logger.Debugf("begin check state") + log := logger.WithType[CheckState]("Event") + log.Debugf("begin") ipfsStatus := consts.IPFS_STATUS_OK @@ -35,7 +36,7 @@ func (t *CheckState) Execute(execCtx ExecuteContext) { if err == nil { execCtx.Args.Scanner.PostEvent(evtmsg) } else { - logger.Warnf("new post event body failed, err: %s", err.Error()) + log.Warnf("new post event body failed, err: %s", err.Error()) } } diff --git a/internal/event/check_storage.go b/internal/event/check_storage.go index c388c10..b5c140f 100644 --- a/internal/event/check_storage.go +++ b/internal/event/check_storage.go @@ -33,6 +33,10 @@ func (t *CheckStorage) TryMerge(other Event) bool { return false } + if event.StorageID != t.StorageID { + return false + } + if event.IsComplete { t.IsComplete = true t.Objects = event.Objects @@ -49,13 +53,14 @@ func (t *CheckStorage) TryMerge(other Event) bool { } func (t *CheckStorage) Execute(execCtx ExecuteContext) { - logger.Debugf("begin check storage") + log := logger.WithType[CheckStorage]("Event") + log.Debugf("begin with %v", logger.FormatStruct(t)) dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, t.Directory) infos, err := ioutil.ReadDir(dirFullPath) if err != nil { - logger.Warnf("list storage directory failed, err: %s", err.Error()) + log.Warnf("list storage directory failed, err: %s", err.Error()) evtmsg, err := scmsg.NewPostEventBody(scevt.NewUpdateStorage( t.StorageID, @@ -65,7 +70,7 @@ func (t *CheckStorage) Execute(execCtx ExecuteContext) { if err == nil { execCtx.Args.Scanner.PostEvent(evtmsg) } else { - logger.Warnf("new post event body failed, err: %s", err.Error()) + log.Warnf("new post event body failed, err: %s", err.Error()) } return } @@ -80,6 +85,8 @@ func (t *CheckStorage) Execute(execCtx ExecuteContext) { } func (t *CheckStorage) checkIncrement(fileInfos []fs.FileInfo, execCtx ExecuteContext) { + log := logger.WithType[CheckStorage]("Event") + infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { infosMap[info.Name()] = info @@ -110,11 +117,13 @@ func (t *CheckStorage) checkIncrement(fileInfos []fs.FileInfo, execCtx ExecuteCo if err == nil { execCtx.Args.Scanner.PostEvent(evtmsg) } else { - logger.Warnf("new post event body failed, err: %s", err.Error()) + log.Warnf("new post event body failed, err: %s", err.Error()) } } func (t *CheckStorage) checkComplete(fileInfos []fs.FileInfo, execCtx ExecuteContext) { + log := logger.WithType[CheckStorage]("Event") + infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { infosMap[info.Name()] = info @@ -145,7 +154,7 @@ func (t *CheckStorage) checkComplete(fileInfos []fs.FileInfo, execCtx ExecuteCon if err == nil { execCtx.Args.Scanner.PostEvent(evtmsg) } else { - logger.Warnf("new post event body failed, err: %s", err.Error()) + log.Warnf("new post event body failed, err: %s", err.Error()) } }