From b92f89a39c78737e0fb2b5a623766fe44df75aba Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 11 Nov 2024 14:31:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=9A=E6=97=B6=E6=B8=85?= =?UTF-8?q?=E7=90=86=E4=B8=B4=E6=97=B6=E6=96=87=E4=BB=B6=E7=9A=84=E6=9C=BA?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/storage/local/shard_store.go | 67 ++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/common/pkgs/storage/local/shard_store.go b/common/pkgs/storage/local/shard_store.go index 9429433..ee56b03 100644 --- a/common/pkgs/storage/local/shard_store.go +++ b/common/pkgs/storage/local/shard_store.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "sync" + "time" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -24,9 +25,11 @@ const ( ) type ShardStore struct { - stg cdssdk.Storage - cfg cdssdk.LocalShardStorage - lock sync.Mutex + stg cdssdk.Storage + cfg cdssdk.LocalShardStorage + lock sync.Mutex + workingTempFiles map[string]bool + done chan any } func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStore, error) { @@ -36,17 +39,67 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor } return &ShardStore{ - stg: stg, - cfg: cfg, + stg: stg, + cfg: cfg, + workingTempFiles: make(map[string]bool), + done: make(chan any, 1), }, nil } func (s *ShardStore) Start(ch *types.StorageEventChan) { s.getLogger().Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize) + + go func() { + removeTempTicker := time.NewTicker(time.Minute * 10) + for { + select { + case <-removeTempTicker.C: + s.removeUnusedTempFiles() + case <-s.done: + return + } + } + }() +} + +func (s *ShardStore) removeUnusedTempFiles() { + s.lock.Lock() + defer s.lock.Unlock() + + log := s.getLogger() + + entries, err := os.ReadDir(filepath.Join(s.cfg.Root, TempDir)) + if err != nil { + log.Warnf("read temp dir: %v", err) + return + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + if s.workingTempFiles[entry.Name()] { + continue + } + + path := filepath.Join(s.cfg.Root, TempDir, entry.Name()) + err = os.Remove(path) + if err != nil { + log.Warnf("remove temp file %v: %v", path, err) + } else { + log.Infof("remove unused temp file %v", path) + } + } } func (s *ShardStore) Stop() { s.getLogger().Infof("local shard store stop") + + select { + case s.done <- nil: + default: + } } func (s *ShardStore) New() types.ShardWriter { @@ -65,6 +118,8 @@ func (s *ShardStore) New() types.ShardWriter { return utils.ErrorShardWriter(err) } + s.workingTempFiles[filepath.Base(file.Name())] = true + return &ShardWriter{ path: file.Name(), // file.Name 包含tmpDir路径 file: file, @@ -194,11 +249,13 @@ func (s *ShardStore) onWritterAbort(w *ShardWriter) { s.getLogger().Debugf("writting file %v aborted", w.path) s.removeTempFile(w.path) + delete(s.workingTempFiles, filepath.Base(w.path)) } func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) { s.lock.Lock() defer s.lock.Unlock() + defer delete(s.workingTempFiles, filepath.Base(w.path)) log := s.getLogger()