Compare commits

...

27 Commits

Author SHA1 Message Date
  Sydonian 0d39c2e58c Merge branch 'master' into gitlink 8 months ago
  Sydonian e90a725dc4 Merge branch 'master' into gitlink 8 months ago
  Sydonian c3c9c404c9 Merge branch 'master' into gitlink 8 months ago
  Sydonian 923534cce7 修复代码问题 8 months ago
  Sydonian a781447bc5 Merge branch 'master' into gitlink 8 months ago
  Sydonian 3bd2f6513b Merge branch 'master' into gitlink 9 months ago
  Sydonian db13739a6f Merge branch 'master' into gitlink 9 months ago
  Sydonian 936de43ac7 修复编译问题 9 months ago
  Sydonian 0bd01ca12a 解决冲突 9 months ago
  Sydonian 2ff3f843dd Merge branch 'master' into gitlink 9 months ago
  Sydonian e36aeee71f Merge branch 'master' into gitlink 10 months ago
  Sydonian 9b5c236360 Merge branch 'master' into gitlink 10 months ago
  Sydonian 997b614e63 Merge branch 'master' into gitlink 10 months ago
  Sydonian b4bc51526b Merge branch 'master' into gitlink 10 months ago
  Sydonian 11097097f6 Merge branch 'master' into gitlink 10 months ago
  Sydonian d8f1a79210 Merge branch 'master' into gitlink 10 months ago
  Sydonian 029015a415 解决挂载点不能被其他用户访问的问题 10 months ago
  Sydonian e3dbab2162 修复挂载目录权限问题 10 months ago
  Sydonian 4b1e94a362 修复同步过滤规则 10 months ago
  Sydonian 083e338aae Merge branch 'master' into gitlink 10 months ago
  Sydonian 938fe9b38f Merge branch 'master' into gitlink 11 months ago
  Sydonian c09ac6ee73 清理注释 11 months ago
  Sydonian b08a5d8fed Merge branch 'master' into gitlink 11 months ago
  Sydonian fdcd462189 Merge branch 'master' into gitlink 11 months ago
  Sydonian 591a1343b9 增加重新加载过滤规则的接口 11 months ago
  Sydonian a48a4e87bc Merge branch 'master' into gitlink 11 months ago
  Sydonian 9c30356741 增加针对gitlink的同步规则 11 months ago
8 changed files with 200 additions and 1 deletions
Split View
  1. +14
    -0
      client/internal/http/v1/mount.go
  2. +1
    -0
      client/internal/http/v1/server.go
  3. +7
    -0
      client/internal/mount/mount_linux.go
  4. +3
    -1
      client/internal/mount/mount_win.go
  5. +23
    -0
      client/internal/mount/vfs/cache/cache.go
  6. +128
    -0
      client/internal/mount/vfs/cache/sync_filter.go
  7. +4
    -0
      client/internal/mount/vfs/vfs.go
  8. +20
    -0
      client/sdk/api/v1/mount.go

+ 14
- 0
client/internal/http/v1/mount.go View File

@@ -36,6 +36,20 @@ func (m *MountService) DumpStatus(ctx *gin.Context) {
}))
}

func (m *MountService) ReloadFilter(ctx *gin.Context) {
// log := logger.WithField("HTTP", "Mount.ReloadFilter")

// var req cliapi.MountReloadFilter
// if err := ctx.ShouldBindQuery(&req); err != nil {
// log.Warnf("binding body: %s", err.Error())
// ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
// return
// }

m.svc.Mount.ReloadSyncFilter()
ctx.JSON(http.StatusOK, types.OK(cliapi.MountReloadFilterResp{}))
}

func (m *MountService) StartReclaimSpace(ctx *gin.Context) {
// log := logger.WithField("HTTP", "Mount.ReclaimSpace")
// var req cliapi.MountReclaimSpace


+ 1
- 0
client/internal/http/v1/server.go View File

@@ -81,6 +81,7 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) {

rt.GET(cliapi.MountDumpStatusPath, certAuth, s.Mount().DumpStatus)
rt.POST(cliapi.MountStartReclaimSpacePath, certAuth, s.Mount().StartReclaimSpace)
rt.POST(cliapi.MountReloadFilterPath, certAuth, s.Mount().ReloadFilter)

rt.GET(cliapi.TickTockListJobsPath, certAuth, s.TickTock().ListJobs)
rt.POST(cliapi.TickTockRunJobPath, certAuth, s.TickTock().RunJob)


+ 7
- 0
client/internal/mount/mount_linux.go View File

@@ -99,6 +99,13 @@ func (m *Mount) Dump() MountStatus {
}
}

func (m *Mount) ReloadSyncFilter() {
if m.vfs == nil {
return
}
m.vfs.ReloadSyncFilter()
}

func (m *Mount) StartReclaimSpace() {
if m.vfs == nil {
return


+ 3
- 1
client/internal/mount/mount_win.go View File

@@ -39,8 +39,10 @@ func (m *Mount) Dump() MountStatus {
return MountStatus{}
}

func (m *Mount) StartReclaimSpace() {
func (m *Mount) ReloadSyncFilter() {
}

func (m *Mount) StartReclaimSpace() {
}

func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) {


+ 23
- 0
client/internal/mount/vfs/cache/cache.go View File

@@ -24,6 +24,10 @@ import (
clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
)

const (
SyncFilterConfigName = ".cds.sync.filter"
)

type CacheEntry interface {
fuse.FsEntry
// 在虚拟文件系统中的路径,即不包含缓存目录的路径
@@ -59,6 +63,7 @@ type Cache struct {
cacheDone chan any
doFullScan chan any
activeCache *trie.Trie[*CacheFile]
syncFilter *SyncFilter
}

func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
@@ -71,11 +76,15 @@ func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downlo
cacheDone: make(chan any, 1),
doFullScan: make(chan any, 1),
activeCache: trie.NewTrie[*CacheFile](),
syncFilter: NewSyncFilter(),
}
}

func (c *Cache) Start() {
c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName))

go c.scanningCache()

go c.scanningData()
}

@@ -97,6 +106,10 @@ func (c *Cache) GetCacheMetaPath(comps ...string) string {
return filepath.Join(comps2...)
}

func (c *Cache) ReloadSyncFilter() {
c.syncFilter.ReloadConfig(c.GetCacheDataPath(SyncFilterConfigName))
}

func (c *Cache) Dump() CacheStatus {
c.lock.RLock()
defer c.lock.RUnlock()
@@ -677,6 +690,10 @@ func (c *Cache) visitNode(path []string, node *trie.Node[*CacheFile], ch *CacheF
shouldUpload = false
}

if !c.syncFilter.ShouldSync(ch.pathComps, info.Size) {
shouldUpload = false
}

// 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传
if shouldUpload && (info.DataRevision > 0 || info.MetaRevision > 0) {
if time.Since(info.FreeTime) < c.cfg.UploadPendingTime {
@@ -823,6 +840,12 @@ func (c *Cache) scanningData() {
// 无条件加载缓存,可能会导致一些不需要被同步到云端的文件在缓存等级降到最低取消跟踪后,又重新被加载进来
// 不过由于扫描频率不高,所以问题不大
walkTraceComps = append(walkTraceComps, e[0].Name())
// 第一个元素是data目录,所以要从第二个元素开始
if !c.syncFilter.ShouldSync(walkTraceComps[1:], e[0].Size()) {
walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
continue
}

untrackedFiles = append(untrackedFiles, lo2.ArrayClone(walkTraceComps[1:]))
walkTraceComps = walkTraceComps[:len(walkTraceComps)-1]
}


+ 128
- 0
client/internal/mount/vfs/cache/sync_filter.go View File

@@ -0,0 +1,128 @@
package cache

import (
"encoding/json"
"os"
"path/filepath"
"strings"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/logger"
)

type SyncFilterRuleConfig struct {
Repo FilterRepoRule `json:"repo"`
Object FilterObjectRule `json:"object"`
}

type FilterRepoRule struct {
BlackListMode bool `json:"blackListMode"` // 是否黑名单模式
Names []string `json:"names"` // 匹配模式
}

type FilterObjectRule struct {
BlackListMode bool `json:"blackListMode"` // 是否黑名单模式
Exts []string `json:"exts"` // 文件名后缀
MinSize int64 `json:"minSize"`
MaxSize int64 `json:"maxSize"` // 0为不限制大小
}

type SyncFilter struct {
lock *sync.RWMutex
config SyncFilterRuleConfig
repoNames map[string]bool
exts map[string]bool
}

func NewSyncFilter() *SyncFilter {
return &SyncFilter{
lock: &sync.RWMutex{},
config: SyncFilterRuleConfig{},
repoNames: make(map[string]bool),
exts: make(map[string]bool),
}
}

func (f *SyncFilter) ShouldSync(fullPathComps []string, size int64) bool {
f.lock.RLock()
defer f.lock.RUnlock()

return f.shouldSyncUnlocked(fullPathComps, size)
}

func (f *SyncFilter) shouldSyncUnlocked(fullPathComps []string, size int64) bool {
// user/repo/objects/pack/pack-xxx.pack 或者 user/repo/objects/xx/xxxxxxxxx
if len(fullPathComps) < 5 {
return false
}

repoName := fullPathComps[1]
if f.repoNames[repoName] {
if f.config.Repo.BlackListMode {
return false
}
} else if !f.config.Repo.BlackListMode {
return false
}

// ext包含"."
ext := filepath.Ext(fullPathComps[len(fullPathComps)-1])
if f.exts[ext] {
if f.config.Object.BlackListMode {
return false
}
} else if !f.config.Object.BlackListMode {
return false
}

if size < f.config.Object.MinSize || (f.config.Object.MaxSize > 0 && size > f.config.Object.MaxSize) {
return false
}

if fullPathComps[2] != "objects" {
return false
}

if fullPathComps[3] == "pack" {
// 避免同步临时文件
if !strings.HasPrefix(fullPathComps[4], "pack-") {
return false
}
} else if len(fullPathComps[3]) == 2 {
// 未压缩的objects
} else {
return false
}

return true
}

func (f *SyncFilter) ReloadConfig(path string) {
data, err := os.ReadFile(path)
if err != nil {
logger.Warnf("loading trace rule config: %v", err)
return
}

cfg := SyncFilterRuleConfig{}
err = json.Unmarshal(data, &cfg)
if err != nil {
logger.Warnf("unmarshal trace rule config: %v", err)
return
}

f.lock.Lock()
defer f.lock.Unlock()

f.config = cfg
f.repoNames = make(map[string]bool)
for _, name := range cfg.Repo.Names {
f.repoNames[name] = true
}
f.exts = make(map[string]bool)
for _, ext := range cfg.Object.Exts {
f.exts[ext] = true
}

logger.Infof("trace rule config reloaded: repoNames=%v, exts=%v, minSize=%d, maxSize=%d", len(f.repoNames), len(f.exts), f.config.Object.MinSize, f.config.Object.MaxSize)
}

+ 4
- 0
client/internal/mount/vfs/vfs.go View File

@@ -43,6 +43,10 @@ func (v *Vfs) Dump() cache.CacheStatus {
return v.cache.Dump()
}

func (v *Vfs) ReloadSyncFilter() {
v.cache.ReloadSyncFilter()
}

func (v *Vfs) ReclaimSpace() {
v.cache.ReclaimSpace()
}

+ 20
- 0
client/sdk/api/v1/mount.go View File

@@ -37,6 +37,26 @@ func (c *MountService) DumpStatus(req MountDumpStatus) (*MountDumpStatusResp, er
return JSONAPI(&c.cfg, c.httpCli, &req, &MountDumpStatusResp{})
}

const MountReloadFilterPath = "/mount/reloadFilter"

type MountReloadFilter struct {
}

func (r *MountReloadFilter) MakeParam() *sdks.RequestParam {
return sdks.MakeQueryParam(http.MethodPost, MountReloadFilterPath, r)
}

type MountReloadFilterResp struct {
}

func (r *MountReloadFilterResp) ParseResponse(resp *http.Response) error {
return sdks.ParseCodeDataJSONResponse(resp, r)
}

func (c *MountService) ReloadFilter(req MountReloadFilter) (*MountReloadFilterResp, error) {
return JSONAPI(&c.cfg, http.DefaultClient, &req, &MountReloadFilterResp{})
}

const MountStartReclaimSpacePath = "/mount/startReclaimSpace"

type StartMountReclaimSpace struct{}


Loading…
Cancel
Save