Browse Source

增加定时扫描数据目录的机制

gitlink
Sydonian 11 months ago
parent
commit
2bda2e5068
3 changed files with 131 additions and 45 deletions
  1. +8
    -6
      client/internal/cmdline/mount.go
  2. +9
    -4
      client/internal/mount/config/config.go
  3. +114
    -35
      client/internal/mount/vfs/cache/cache.go

+ 8
- 6
client/internal/cmdline/mount.go View File

@@ -93,12 +93,14 @@ func mountCmd(mountPoint string, configPath string) {
uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db)

mnt := mount.NewMount(&mntcfg.Config{
CacheDir: "./cache",
MountPoint: mountPoint,
AttrTimeout: time.Second * 5,
UploadPendingTime: time.Second * 10,
CacheActiveTime: time.Second * 10,
CacheExpireTime: time.Second * 60,
DataDir: "./cache/data",
MetaDir: "./cache/meta",
MountPoint: mountPoint,
AttrTimeout: time.Second * 5,
UploadPendingTime: time.Second * 10,
CacheActiveTime: time.Second * 10,
CacheExpireTime: time.Second * 60,
ScanDataDirInterval: 30 * time.Second,
}, db, uploader, &dlder)

ch := mnt.Start()


+ 9
- 4
client/internal/mount/config/config.go View File

@@ -3,10 +3,13 @@ package config
import "time"

type Config struct {
CacheDir string `json:"cacheDir"`
MountPoint string `json:"mountPoint"`
GID uint32 `json:"gid"`
UID uint32 `json:"uid"`
MountPoint string `json:"mountPoint"`
GID uint32 `json:"gid"`
UID uint32 `json:"uid"`
// 缓存数据的目录,可以是已有内容的目录,此时通过挂载点查看文件夹内容时,能看到此目录中已有的文件。
DataDir string `json:"dataDir"`
// 缓存元数据的目录,此目录中保存了文件的元数据,包括文件名、大小、修改时间、权限等信息,目录结构将与DataDir保持一致。
MetaDir string `json:"metaDir"`
AttrTimeout time.Duration `json:"attrTimeout"`
// 被修改的文件在被上传到云端之前的等待时间,如果期间有任何读写操作,则重置等待时间
UploadPendingTime time.Duration `json:"uploadPendingTime"`
@@ -14,4 +17,6 @@ type Config struct {
CacheActiveTime time.Duration `json:"cacheActiveTime"`
// 缓存数据的过期时间,如果文件在此时间内没有被访问过,则从本地删除缓存数据
CacheExpireTime time.Duration `json:"cacheExpireTime"`
// 扫描DataDir目录的间隔时间。
ScanDataDirInterval time.Duration `json:"scanDataDirInterval"`
}

+ 114
- 35
client/internal/mount/vfs/cache/cache.go View File

@@ -38,33 +38,30 @@ type CacheEntryInfo struct {
}

type Cache struct {
cfg *config.Config
db *db.DB
uploader *uploader.Uploader
downloader *downloader.Downloader
cacheDataDir string
cacheMetaDir string
lock *sync.RWMutex
cacheDone chan any
activeCache *trie.Trie[*CacheFile]
cfg *config.Config
db *db.DB
uploader *uploader.Uploader
downloader *downloader.Downloader
lock *sync.RWMutex
cacheDone chan any
activeCache *trie.Trie[*CacheFile]
}

func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
return &Cache{
cfg: cfg,
db: db,
uploader: uploader,
downloader: downloader,
cacheDataDir: filepath.Join(cfg.CacheDir, "data"),
cacheMetaDir: filepath.Join(cfg.CacheDir, "meta"),
lock: &sync.RWMutex{},
cacheDone: make(chan any),
activeCache: trie.NewTrie[*CacheFile](),
cfg: cfg,
db: db,
uploader: uploader,
downloader: downloader,
lock: &sync.RWMutex{},
cacheDone: make(chan any),
activeCache: trie.NewTrie[*CacheFile](),
}
}

func (c *Cache) Start() {
go c.scanningCache()
go c.scanningData()
}

func (c *Cache) Stop() {
@@ -73,32 +70,18 @@ func (c *Cache) Stop() {

func (c *Cache) GetCacheDataPath(comps ...string) string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheDataDir
comps2[0] = c.cfg.DataDir
copy(comps2[1:], comps)
return filepath.Join(comps2...)
}

func (c *Cache) GetCacheDataPathComps(comps ...string) []string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheDataDir
copy(comps2[1:], comps)
return comps2
}

func (c *Cache) GetCacheMetaPath(comps ...string) string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheMetaDir
comps2[0] = c.cfg.MetaDir
copy(comps2[1:], comps)
return filepath.Join(comps2...)
}

func (c *Cache) GetCacheMetaPathComps(comps ...string) []string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheMetaDir
copy(comps2[1:], comps)
return comps2
}

// 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
c.lock.RLock()
@@ -113,7 +96,6 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
dataPath := c.GetCacheDataPath(pathComps...)
stat, err := os.Stat(dataPath)
if err != nil {
// logger.Warnf("")
// TODO 日志记录
return nil
}
@@ -136,6 +118,8 @@ func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
}

// 创建一个缓存文件。如果文件已经存在,则会覆盖已有文件。如果加载过程中发生了错误,或者目标位置是一个目录,则会返回nil。
//
// 记得使用Release减少引用计数
func (c *Cache) CreateFile(pathComps []string) *CacheFile {
c.lock.Lock()
defer c.lock.Unlock()
@@ -162,6 +146,8 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile {
}

// 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。
//
// 记得使用Release减少引用计数
func (c *Cache) LoadFile(pathComps []string, obj *clitypes.Object) *CacheFile {
c.lock.Lock()
defer c.lock.Unlock()
@@ -549,6 +535,99 @@ func (c *Cache) scanningCache() {
}
}

func (c *Cache) scanningData() {
ticker := time.NewTicker(c.cfg.ScanDataDirInterval)
defer ticker.Stop()

var walkTrace []*os.File
var walkTraceComps []string
for {
select {
case <-ticker.C:
case <-c.cacheDone:
return
}

logger.Infof("begin scanning data dir")

if len(walkTrace) == 0 {
dir, err := os.Open(c.cfg.DataDir)
if err != nil {
logger.Warnf("open data dir: %v", err)
continue
}

walkTrace = []*os.File{dir}
walkTraceComps = []string{c.cfg.MetaDir}
}

const maxVisitCnt = 5000
const maxUntrackedFiles = 500

var untrackedFiles [][]string
visitCnt := 0

// 一次最多遍历5000个文件(包括路径上的文件夹),一次最多添加500个未跟踪文件
for len(walkTrace) > 0 && visitCnt < maxVisitCnt && len(untrackedFiles) < maxUntrackedFiles {
lastNode := walkTrace[len(walkTrace)-1]
visitCnt++

e, err := lastNode.ReadDir(1)
if err == io.EOF {
lastNode.Close()
walkTrace = walkTrace[:len(walkTrace)-1]
walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
continue
}
if err != nil {
logger.Warnf("read dir %v: %v", lastNode.Name(), err)
lastNode.Close()
walkTrace = walkTrace[:len(walkTrace)-1]
walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
continue
}

if e[0].IsDir() {
child, err := os.Open(filepath.Join(lastNode.Name(), e[0].Name()))
if err != nil {
logger.Warnf("open dir %v: %v", e[0].Name(), err)
continue
}
walkTrace = append(walkTrace, child)
walkTraceComps = append(walkTraceComps, e[0].Name())
continue
}

// 对于不在Package层级的文件,不跟踪
if len(walkTrace) <= 2 {
continue
}

walkTraceComps = append(walkTraceComps, e[0].Name())
fileMetaPath := filepath.Join(walkTraceComps...)
_, err = os.Stat(fileMetaPath)
if err == nil || !os.IsNotExist(err) {
walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
continue
}

untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:]))
walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
}

if len(untrackedFiles) > 0 {
for _, comps := range untrackedFiles {
ch := c.LoadFile(comps, nil)
if ch != nil {
ch.Release()
}
}
}

logger.Infof("%v file visited, %v untracked files found", visitCnt, len(untrackedFiles))
}
}

func (c *Cache) doUploading(pkgs []*uploadingPackage) {
/// 1. 先尝试创建Package
var sucPkgs []*uploadingPackage


Loading…
Cancel
Save