diff --git a/client/internal/cmdline/mount.go b/client/internal/cmdline/mount.go index 1aa0789..f4940b2 100644 --- a/client/internal/cmdline/mount.go +++ b/client/internal/cmdline/mount.go @@ -93,12 +93,14 @@ func mountCmd(mountPoint string, configPath string) { uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db) mnt := mount.NewMount(&mntcfg.Config{ - CacheDir: "./cache", - MountPoint: mountPoint, - AttrTimeout: time.Second * 5, - UploadPendingTime: time.Second * 10, - CacheActiveTime: time.Second * 10, - CacheExpireTime: time.Second * 60, + DataDir: "./cache/data", + MetaDir: "./cache/meta", + MountPoint: mountPoint, + AttrTimeout: time.Second * 5, + UploadPendingTime: time.Second * 10, + CacheActiveTime: time.Second * 10, + CacheExpireTime: time.Second * 60, + ScanDataDirInterval: 30 * time.Second, }, db, uploader, &dlder) ch := mnt.Start() diff --git a/client/internal/mount/config/config.go b/client/internal/mount/config/config.go index 922e90d..5cb617b 100644 --- a/client/internal/mount/config/config.go +++ b/client/internal/mount/config/config.go @@ -3,10 +3,13 @@ package config import "time" type Config struct { - CacheDir string `json:"cacheDir"` - MountPoint string `json:"mountPoint"` - GID uint32 `json:"gid"` - UID uint32 `json:"uid"` + MountPoint string `json:"mountPoint"` + GID uint32 `json:"gid"` + UID uint32 `json:"uid"` + // 缓存数据的目录,可以是已有内容的目录,此时通过挂载点查看文件夹内容时,能看到此目录中已有的文件。 + DataDir string `json:"dataDir"` + // 缓存元数据的目录,此目录中保存了文件的元数据,包括文件名、大小、修改时间、权限等信息,目录结构将与DataDir保持一致。 + MetaDir string `json:"metaDir"` AttrTimeout time.Duration `json:"attrTimeout"` // 被修改的文件在被上传到云端之前的等待时间,如果期间有任何读写操作,则重置等待时间 UploadPendingTime time.Duration `json:"uploadPendingTime"` @@ -14,4 +17,6 @@ type Config struct { CacheActiveTime time.Duration `json:"cacheActiveTime"` // 缓存数据的过期时间,如果文件在此时间内没有被访问过,则从本地删除缓存数据 CacheExpireTime time.Duration `json:"cacheExpireTime"` + // 扫描DataDir目录的间隔时间。 + ScanDataDirInterval time.Duration `json:"scanDataDirInterval"` } diff --git a/client/internal/mount/vfs/cache/cache.go b/client/internal/mount/vfs/cache/cache.go index d520ab8..aac1b53 100644 --- a/client/internal/mount/vfs/cache/cache.go +++ b/client/internal/mount/vfs/cache/cache.go @@ -38,33 +38,30 @@ type CacheEntryInfo struct { } type Cache struct { - cfg *config.Config - db *db.DB - uploader *uploader.Uploader - downloader *downloader.Downloader - cacheDataDir string - cacheMetaDir string - lock *sync.RWMutex - cacheDone chan any - activeCache *trie.Trie[*CacheFile] + cfg *config.Config + db *db.DB + uploader *uploader.Uploader + downloader *downloader.Downloader + lock *sync.RWMutex + cacheDone chan any + activeCache *trie.Trie[*CacheFile] } func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache { return &Cache{ - cfg: cfg, - db: db, - uploader: uploader, - downloader: downloader, - cacheDataDir: filepath.Join(cfg.CacheDir, "data"), - cacheMetaDir: filepath.Join(cfg.CacheDir, "meta"), - lock: &sync.RWMutex{}, - cacheDone: make(chan any), - activeCache: trie.NewTrie[*CacheFile](), + cfg: cfg, + db: db, + uploader: uploader, + downloader: downloader, + lock: &sync.RWMutex{}, + cacheDone: make(chan any), + activeCache: trie.NewTrie[*CacheFile](), } } func (c *Cache) Start() { go c.scanningCache() + go c.scanningData() } func (c *Cache) Stop() { @@ -73,32 +70,18 @@ func (c *Cache) Stop() { func (c *Cache) GetCacheDataPath(comps ...string) string { comps2 := make([]string, len(comps)+1) - comps2[0] = c.cacheDataDir + comps2[0] = c.cfg.DataDir copy(comps2[1:], comps) return filepath.Join(comps2...) } -func (c *Cache) GetCacheDataPathComps(comps ...string) []string { - comps2 := make([]string, len(comps)+1) - comps2[0] = c.cacheDataDir - copy(comps2[1:], comps) - return comps2 -} - func (c *Cache) GetCacheMetaPath(comps ...string) string { comps2 := make([]string, len(comps)+1) - comps2[0] = c.cacheMetaDir + comps2[0] = c.cfg.MetaDir copy(comps2[1:], comps) return filepath.Join(comps2...) } -func (c *Cache) GetCacheMetaPathComps(comps ...string) []string { - comps2 := make([]string, len(comps)+1) - comps2[0] = c.cacheMetaDir - copy(comps2[1:], comps) - return comps2 -} - // 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。 func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { c.lock.RLock() @@ -113,7 +96,6 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { dataPath := c.GetCacheDataPath(pathComps...) stat, err := os.Stat(dataPath) if err != nil { - // logger.Warnf("") // TODO 日志记录 return nil } @@ -136,6 +118,8 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo { } // 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。 +// +// 记得使用Release减少引用计数 func (c *Cache) CreateFile(pathComps []string) *CacheFile { c.lock.Lock() defer c.lock.Unlock() @@ -162,6 +146,8 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile { } // 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。 +// +// 记得使用Release减少引用计数 func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile { c.lock.Lock() defer c.lock.Unlock() @@ -549,6 +535,99 @@ func (c *Cache) scanningCache() { } } +func (c *Cache) scanningData() { + ticker := time.NewTicker(c.cfg.ScanDataDirInterval) + defer ticker.Stop() + + var walkTrace []*os.File + var walkTraceComps []string + for { + select { + case <-ticker.C: + case <-c.cacheDone: + return + } + + logger.Infof("begin scanning data dir") + + if len(walkTrace) == 0 { + dir, err := os.Open(c.cfg.DataDir) + if err != nil { + logger.Warnf("open data dir: %v", err) + continue + } + + walkTrace = []*os.File{dir} + walkTraceComps = []string{c.cfg.MetaDir} + } + + const maxVisitCnt = 5000 + const maxUntrackedFiles = 500 + + var untrackedFiles [][]string + visitCnt := 0 + + // 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件 + for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles { + lastNode := walkTrace[len(walkTrace)-1] + visitCnt++ + + e, err := lastNode.ReadDir(1) + if err == io.EOF { + lastNode.Close() + walkTrace = walkTrace[:len(walkTrace)-1] + walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] + continue + } + if err != nil { + logger.Warnf("read dir %v: %v", lastNode.Name(), err) + lastNode.Close() + walkTrace = walkTrace[:len(walkTrace)-1] + walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] + continue + } + + if e[0].IsDir() { + child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name())) + if err != nil { + logger.Warnf("open dir %v: %v", e[0].Name(), err) + continue + } + walkTrace = append(walkTrace, child) + walkTraceComps = append(walkTraceComps, e[0].Name()) + continue + } + + // 对于不在Package层级的文件,不跟踪 + if len(walkTrace) <= 2 { + continue + } + + walkTraceComps = append(walkTraceComps, e[0].Name()) + fileMetaPath := filepath.Join(walkTraceComps...) + _, err = os.Stat(fileMetaPath) + if err == nil || !os.IsNotExist(err) { + walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] + continue + } + + untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:])) + walkTraceComps = walkTraceComps[:len(walkTraceComps)-1] + } + + if len(untrackedFiles) > 0 { + for _, comps := range untrackedFiles { + ch := c.LoadFile(comps, nil) + if ch != nil { + ch.Release() + } + } + } + + logger.Infof("%v file visited, %v untracked files found", visitCnt, len(untrackedFiles)) + } +} + func (c *Cache) doUploading(pkgs []*uploadingPackage) { /// 1. 先尝试创建Package var sucPkgs []*uploadingPackage