diff --git a/client2/internal/cmd/mount.go b/client2/internal/cmd/mount.go index 889ad3a..9e2e852 100644 --- a/client2/internal/cmd/mount.go +++ b/client2/internal/cmd/mount.go @@ -13,11 +13,13 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" "gitlink.org.cn/cloudream/storage/common/pkgs/metacache" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) func init() { @@ -68,22 +70,36 @@ func mountCmd(mountPoint string, configPath string) { hubMeta := metacacheHost.AddHubMeta() conMeta := metacacheHost.AddConnectivity() + // 分布式锁 + distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) + if err != nil { + logger.Warnf("new distlock service failed, err: %s", err.Error()) + os.Exit(1) + } + go serveDistLock(distlockSvc) + // 初始化下载策略选择器 strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 初始化下载器 dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) + // 上传器 + uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta) + db, err := db2.NewDB(&config.Cfg().DB) if err != nil { logger.Fatalf("new db2 failed, err: %s", err.Error()) } mnt := mount.NewMount(&mntcfg.Config{ - CacheDir: "./cache", - MountPoint: mountPoint, - AttrTimeout: time.Second * 5, - }, db, &dlder) + CacheDir: "./cache", + MountPoint: mountPoint, + AttrTimeout: time.Second * 5, + UploadPendingTime: time.Second * 10, + CacheActiveTime: time.Second * 10, + CacheExpireTime: time.Second * 60, + }, db, uploader, &dlder) ch := mnt.Start() for { @@ -103,3 +119,18 @@ func mountCmd(mountPoint string, configPath string) { } } } + +func serveDistLock(svc *distlock.Service) { + logger.Info("start serving distlock") + + err := svc.Serve() + + if err != nil { + logger.Errorf("distlock stopped with error: %s", err.Error()) + } + + logger.Info("distlock stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} diff --git a/client2/internal/mount/config/config.go b/client2/internal/mount/config/config.go index 32a0bd6..922e90d 100644 --- a/client2/internal/mount/config/config.go +++ b/client2/internal/mount/config/config.go @@ -8,4 +8,10 @@ type Config struct { GID uint32 `json:"gid"` UID uint32 `json:"uid"` AttrTimeout time.Duration `json:"attrTimeout"` + // 被修改的文件在被上传到云端之前的等待时间,如果期间有任何读写操作,则重置等待时间 + UploadPendingTime time.Duration `json:"uploadPendingTime"` + // 被加载到内存的缓存文件信息的过期时间,如果文件在此时间内没有被访问过,则从缓存中删除 + CacheActiveTime time.Duration `json:"cacheActiveTime"` + // 缓存数据的过期时间,如果文件在此时间内没有被访问过,则从本地删除缓存数据 + CacheExpireTime time.Duration `json:"cacheExpireTime"` } diff --git a/client2/internal/mount/mount.go b/client2/internal/mount/mount.go index 0abfccc..2de2590 100644 --- a/client2/internal/mount/mount.go +++ b/client2/internal/mount/mount.go @@ -12,6 +12,7 @@ import ( "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) type MountEvent interface { @@ -33,8 +34,8 @@ type Mount struct { fuse *fuse2.Fuse } -func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount { - vfs := vfs.NewVfs(cfg, db, downloader) +func NewMount(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount { + vfs := vfs.NewVfs(cfg, db, uploader, downloader) fuse := fuse2.NewFuse(cfg, vfs) return &Mount{ diff --git a/client2/internal/mount/mount_win.go b/client2/internal/mount/mount_win.go index 18c2e87..5bfc91f 100644 --- a/client2/internal/mount/mount_win.go +++ b/client2/internal/mount/mount_win.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) type MountEvent interface { @@ -28,7 +29,7 @@ type MountingFailedEvent struct { type Mount struct { } -func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount { +func NewMount(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount { return &Mount{} } diff --git a/client2/internal/mount/vfs/cache/cache.go b/client2/internal/mount/vfs/cache/cache.go index 6b1ef4b..0434d7a 100644 --- a/client2/internal/mount/vfs/cache/cache.go +++ b/client2/internal/mount/vfs/cache/cache.go @@ -2,19 +2,25 @@ package cache import ( "errors" + "io" "os" "path/filepath" "sync" "syscall" "time" + "github.com/inhies/go-bytesize" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) type CacheEntry interface { @@ -32,22 +38,25 @@ type CacheEntryInfo struct { } type Cache struct { + cfg *config.Config db *db2.DB + uploader *uploader.Uploader downloader *downloader.Downloader cacheDataDir string cacheMetaDir string lock *sync.RWMutex cacheDone chan any activeCache *trie.Trie[*CacheFile] - freeCache []*CacheFile } -func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache { +func NewCache(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache { return &Cache{ + cfg: cfg, db: db, + uploader: uploader, downloader: downloader, - cacheDataDir: cacheDataDir, - cacheMetaDir: cacheMetaDir, + cacheDataDir: filepath.Join(cfg.CacheDir, "data"), + cacheMetaDir: filepath.Join(cfg.CacheDir, "meta"), lock: &sync.RWMutex{}, cacheDone: make(chan any), activeCache: trie.NewTrie[*CacheFile](), @@ -55,7 +64,7 @@ func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cache } func (c *Cache) Start() { - go c.clearFreeCache() + go c.scanningCache() } func (c *Cache) Stop() { @@ -130,13 +139,21 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile { c.lock.Lock() defer c.lock.Unlock() + node, ok := c.activeCache.WalkEnd(pathComps) + if ok && node.Value != nil { + node.Value.Delete() + if node.Value.state.uploading != nil { + node.Value.state.uploading.isDeleted = true + } + } + ch, err := createNewCacheFile(c, pathComps) if err != nil { logger.Warnf("create new cache file %v: %v", pathComps, err) return nil } - ch.refCount++ + ch.state.refCount++ c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("create new cache file %v", pathComps) @@ -150,13 +167,21 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { node, ok := c.activeCache.WalkEnd(pathComps) if ok && node.Value != nil { + if !node.Value.state.isLoaded { + err := node.Value.Load() + if err != nil { + logger.Warnf("load cache %v: %v", pathComps, err) + return nil + } + } + return node.Value } ch, err := loadCacheFile(c, pathComps) if err == nil { ch.remoteObj = obj - ch.refCount++ + ch.state.refCount++ c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("load cache %v", pathComps) @@ -179,7 +204,7 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile { return nil } - ch.refCount++ + ch.state.refCount++ c.activeCache.CreateWords(pathComps).Value = ch logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID) @@ -293,7 +318,9 @@ func (c *Cache) Remove(pathComps []string) error { if node.Value != nil { node.Value.Delete() - c.freeCache = lo2.Remove(c.freeCache, node.Value) + if node.Value.state.uploading != nil { + node.Value.state.uploading.isDeleted = true + } } node.RemoveSelf(true) @@ -369,10 +396,26 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error { return nil } -func (c *Cache) clearFreeCache() { +type uploadingPackage struct { + bktName string + pkgName string + pkg cdssdk.Package + upObjs []*uploadingObject +} + +type uploadingObject struct { + pathComps []string + cache *CacheFile + reader *CacheFileReadWriter + isDeleted bool + isSuccess bool +} + +func (c *Cache) scanningCache() { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() + lastScanPath := []string{} for { select { @@ -385,17 +428,228 @@ func (c *Cache) clearFreeCache() { } c.lock.Lock() - for i, ch := range c.freeCache { - if time.Since(ch.freeTime) > time.Second*30 { - ch.Free() - node, _ := c.activeCache.WalkEnd(ch.PathComps()) - node.RemoveSelf(true) - c.freeCache[i] = nil - - logger.Debugf("cache %v freed", ch.PathComps()) + + type packageFullName struct { + bktName string + pkgName string + } + + uploadingPkgs := make(map[packageFullName]*uploadingPackage) + + visitCnt := 0 + visitBreak := false + + node, _ := c.activeCache.WalkEnd(lastScanPath) + node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl { + ch := node.Value + if ch == nil { + return trie.VisitContinue + } + + if ch.state.refCount > 0 { + logger.Debugf("skip cache %v, refCount: %v", path, ch.state.refCount) + return trie.VisitContinue + } + + visitCnt++ + if ch.Revision() > 0 { + // 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传 + + // 不存放在Package里的文件,不需要上传 + if len(ch.pathComps) <= 2 { + return trie.VisitContinue + } + + if time.Since(ch.state.freeTime) > c.cfg.UploadPendingTime && ch.state.uploading == nil { + fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]} + pkg, ok := uploadingPkgs[fullName] + if !ok { + pkg = &uploadingPackage{ + bktName: ch.pathComps[0], + pkgName: ch.pathComps[1], + } + uploadingPkgs[fullName] = pkg + } + obj := &uploadingObject{ + pathComps: lo2.ArrayClone(ch.pathComps), + cache: ch, + reader: ch.OpenReadWhenScanning(), + } + pkg.upObjs = append(pkg.upObjs, obj) + ch.state.uploading = obj + } + } else if ch.state.isLoaded { + // 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载 + + if time.Since(ch.state.freeTime) > c.cfg.CacheActiveTime { + ch.Unload() + ch.state.isLoaded = false + ch.state.unloadTime = time.Now() + } + } else { + // 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。 + // 能达到这个阶段,则肯定已经被同步到远端了 + + if time.Since(ch.state.unloadTime) > c.cfg.CacheExpireTime { + ch.Delete() + node.RemoveSelf(true) + } + } + + // 每次最多遍历500个节点,防止占用锁太久 + if visitCnt > 500 { + lastScanPath = lo2.ArrayClone(path) + visitBreak = true + return trie.VisitBreak + } + return trie.VisitContinue + }) + if !visitBreak { + lastScanPath = []string{} + } + + c.lock.Unlock() + + if len(uploadingPkgs) > 0 { + go c.doUploading(lo.Values(uploadingPkgs)) + } + } +} + +func (c *Cache) doUploading(pkgs []*uploadingPackage) { + /// 1. 先查询每个Package的信息,如果不存在,则暂时不上传 + var sucPkgs []*uploadingPackage + var failedPkgs []*uploadingPackage + for _, pkg := range pkgs { + // TODO 用户ID + p, err := c.db.Package().GetUserPackageByName(c.db.DefCtx(), 1, pkg.bktName, pkg.pkgName) + if err != nil { + logger.Warnf("get user package %v/%v: %v", pkg.bktName, pkg.pkgName, err) + failedPkgs = append(failedPkgs, pkg) + continue + } + + pkg.pkg = p + sucPkgs = append(sucPkgs, pkg) + } + + /// 2. 对于查询失败的Package,直接关闭文件,不进行上传 + // 在锁的保护下取消上传状态 + c.lock.Lock() + for _, pkg := range failedPkgs { + for _, obj := range pkg.upObjs { + obj.cache.state.uploading = nil + } + } + c.lock.Unlock() + // 关闭文件必须在锁外 + for _, pkg := range failedPkgs { + for _, obj := range pkg.upObjs { + obj.reader.Close() + } + } + + /// 3. 开始上传每个Package + for _, p := range sucPkgs { + uploader, err := c.uploader.BeginUpdate(1, p.pkg.PackageID, 0, nil, nil) + if err != nil { + logger.Warnf("begin update package %v/%v: %v", p.bktName, p.pkgName, err) + continue + } + + upSuc := 0 + upSucAmt := int64(0) + upFailed := 0 + upStartTime := time.Now() + + logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName) + for _, o := range p.upObjs { + rd := cacheFileReader{ + rw: o.reader, + } + + counter := io2.Counter(&rd) + + err = uploader.Upload(cdssdk.JoinObjectPath(o.pathComps[2:]...), counter) + if err != nil { + logger.Warnf("upload object %v: %v", o.pathComps, err) + upFailed++ + continue + } + + o.isSuccess = true + upSuc++ + upSucAmt += counter.Count() + } + + // 在锁保护下登记上传结果 + c.lock.Lock() + + upCancel := 0 + upRename := 0 + + // 检查是否有文件在上传期间发生了变化 + var sucObjs []*uploadingObject + for _, o := range p.upObjs { + o.cache.state.uploading = nil + if !o.isSuccess { + continue } + + oldPath := cdssdk.JoinObjectPath(o.pathComps[2:]...) + if o.isDeleted { + uploader.CancelObject(oldPath) + upCancel++ + continue + } + + newPath := cdssdk.JoinObjectPath(o.cache.pathComps[2:]...) + if newPath != oldPath { + uploader.RenameObject(oldPath, newPath) + upRename++ + } + + sucObjs = append(sucObjs, o) } - c.freeCache = lo2.RemoveAllDefault(c.freeCache) + + _, err = uploader.Commit() + if err != nil { + logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err) + } else { + for _, obj := range sucObjs { + obj.cache.RevisionUploaded(obj.reader.revision) + } + + upTime := time.Since(upStartTime) + logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v", + p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename) + } + c.lock.Unlock() + + // 在Cache锁以外关闭文件。 + // 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作 + for _, obj := range p.upObjs { + obj.reader.Close() + } } } + +type cacheFileReader struct { + rw *CacheFileReadWriter + pos int64 +} + +func (r *cacheFileReader) Read(p []byte) (int, error) { + n, err := r.rw.ReadAt(p, r.pos) + r.pos += int64(n) + if err != nil { + return n, err + } + + if n != len(p) { + return n, io.EOF + } + + return n, nil +} diff --git a/client2/internal/mount/vfs/cache/file.go b/client2/internal/mount/vfs/cache/file.go index b7e8751..18c1bd6 100644 --- a/client2/internal/mount/vfs/cache/file.go +++ b/client2/internal/mount/vfs/cache/file.go @@ -20,8 +20,8 @@ type FileInfo struct { // 文件总大小。可能会超过对应的远端文件的大小。 // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。 Size int64 - // 本文件是否有未提交的修改 - Dirty bool + // 如果大于0,则代表有未提交的修改 + Revision int // 数据段列表,按照段开始位置从小到大排列 Segments []*Range // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 @@ -80,24 +80,29 @@ func (r *Range) End() int64 { type CacheFile struct { cache *Cache pathComps []string - name string info FileInfo remoteObj *cdssdk.Object - infoRev int64 rwLock *sync.RWMutex readers []*CacheFileReadWriter writers []*CacheFileReadWriter saveMetaChan chan any + noSaveMeta bool // 防止在Unload之后又保存了文件 isDeleted bool - isFreed bool metaFile *os.File dataFile *os.File writeLock *sync.RWMutex - // 下面的字段不受rwLock保护! - refCount int - freeTime time.Time + // 缓存文件的状态,用于管理缓存文件的生命周期。不受rwLock保护,而是由Cache管理 + state cacheState +} + +type cacheState struct { + refCount int + freeTime time.Time + unloadTime time.Time + isLoaded bool + uploading *uploadingObject } func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { @@ -105,9 +110,9 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ - Dirty: true, - ModTime: time.Now(), - Perm: 0777, + Revision: 1, + ModTime: time.Now(), + Perm: 0777, } infoData, err := serder.ObjectToJSON(info) @@ -135,16 +140,18 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { ch := &CacheFile{ cache: cache, pathComps: pathComps, - name: pathComps[len(pathComps)-1], info: info, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, + state: cacheState{ + isLoaded: true, + }, } - go ch.serving() + go ch.serving(ch.saveMetaChan) return ch, nil } @@ -176,16 +183,18 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { ch := &CacheFile{ cache: cache, pathComps: pathComps, - name: pathComps[len(pathComps)-1], info: *info, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, + state: cacheState{ + isLoaded: true, + }, } - go ch.serving() + go ch.serving(ch.saveMetaChan) return ch, nil } @@ -226,7 +235,6 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object ch := &CacheFile{ cache: cache, pathComps: pathComps, - name: pathComps[len(pathComps)-1], info: info, remoteObj: obj, rwLock: &sync.RWMutex{}, @@ -234,9 +242,12 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, + state: cacheState{ + isLoaded: true, + }, } - go ch.serving() + go ch.serving(ch.saveMetaChan) return ch, nil } @@ -264,30 +275,73 @@ func loadCacheFileInfo(cache *Cache, pathComps []string) (*CacheEntryInfo, error }, nil } -func (f *CacheFile) PathComps() []string { - return f.pathComps -} +// 加载缓存文件。如果已经加载了,则无任何效果 +func (f *CacheFile) Load() error { + f.rwLock.Lock() + defer f.rwLock.Unlock() -func (f *CacheFile) Name() string { - return f.name -} + if f.isDeleted { + return fmt.Errorf("cache deleted") + } -func (f *CacheFile) Size() int64 { - return f.info.Size -} + metaPath := f.cache.GetCacheMetaPath(f.pathComps...) + dataPath := f.cache.GetCacheDataPath(f.pathComps...) + + metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644) + if err != nil { + return err + } -func (f *CacheFile) Mode() os.FileMode { - return f.info.Perm + dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644) + if err != nil { + metaFile.Close() + return err + } + + f.saveMetaChan = make(chan any) + f.noSaveMeta = false + f.metaFile = metaFile + f.dataFile = dataFile + + go f.serving(f.saveMetaChan) + return nil } -func (f *CacheFile) ModTime() time.Time { - return f.info.ModTime +// 关闭缓存文件,保存元数据。但缓存对象依然会留在内存里,以备随时查询元数据。 +// +// 只应该在引用计数为0时调用。 +func (f *CacheFile) Unload() { + f.rwLock.Lock() + defer f.rwLock.Unlock() + + if !f.isDeleted { + // TODO 日志 + f.saveMeta() + } + + // 防止在关闭缓存后又保存了文件 + close(f.saveMetaChan) + f.saveMetaChan = nil + f.noSaveMeta = true + f.metaFile.Close() + f.dataFile.Close() } -func (f *CacheFile) IsDir() bool { - return false +// 可在Unload状态下调用 +func (f *CacheFile) RevisionUploaded(rev int) { + f.rwLock.Lock() + defer f.rwLock.Unlock() + + if f.info.Revision == rev { + f.info.Revision = 0 + } + + if f.saveMetaChan != nil { + f.letSave() + } } +// 可在Unload状态下调用 func (f *CacheFile) Info() CacheEntryInfo { return CacheEntryInfo{ PathComps: f.pathComps, @@ -298,6 +352,13 @@ func (f *CacheFile) Info() CacheEntryInfo { } } +func (f *CacheFile) Revision() int { + f.rwLock.RLock() + defer f.rwLock.RUnlock() + return f.info.Revision +} + +// 可在Unload状态下调用 func (f *CacheFile) Delete() { f.writeLock.Lock() defer f.writeLock.Unlock() @@ -309,11 +370,16 @@ func (f *CacheFile) Delete() { dataPath := f.cache.GetCacheDataPath(f.pathComps...) os.Remove(metaPath) os.Remove(dataPath) + + // 可能是在被使用状态下删除,也可能是在Unload状态下删除,所以这里不关闭saveMetaChan,而是设置isDeleted为true f.isDeleted = true - f.letSave() + if f.saveMetaChan != nil { + f.letSave() + } } +// 可在Unload状态下调用 func (f *CacheFile) Move(newPathComps []string) { f.writeLock.Lock() defer f.writeLock.Unlock() @@ -322,31 +388,29 @@ func (f *CacheFile) Move(newPathComps []string) { defer f.rwLock.Unlock() f.pathComps = newPathComps - f.name = newPathComps[len(newPathComps)-1] - f.letSave() + if f.saveMetaChan != nil { + f.letSave() + } } // 打开一个写入句柄,同时支持读取 +// +// 不可在Unload状态下调用! func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter { - logger.Tracef("CacheFile.Open: %v, %#x", f.name, flags) + logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags) f.cache.lock.Lock() + f.state.refCount++ + f.cache.lock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() - f.refCount++ - if f.refCount == 1 && !f.isDeleted { - f.cache.freeCache = lo2.Remove(f.cache.freeCache, f) - } - - // 提前释放Cache的锁 - f.cache.lock.Unlock() - h := &CacheFileReadWriter{ file: f, remoteLock: &sync.Mutex{}, + revision: f.info.Revision, } if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) { @@ -371,18 +435,43 @@ func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter { return h } +// 打开一个读取句柄,用于同步本地文件到远端。由于此方法会在扫描缓存时调用,所以refCount增加时不需要加锁 +// +// 不可在Unload状态下调用! +func (f *CacheFile) OpenReadWhenScanning() *CacheFileReadWriter { + f.rwLock.Lock() + defer f.rwLock.Unlock() + + f.state.refCount++ + + h := &CacheFileReadWriter{ + file: f, + remoteLock: &sync.Mutex{}, + revision: f.info.Revision, + readable: true, + } + + if f.remoteObj != nil { + h.remote = newRemoteLoader(f) + } + + f.readers = append(f.readers, h) + return h +} + +// 不可在Unload状态下调用! func (f *CacheFile) SetModTime(modTime time.Time) error { logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime) f.rwLock.Lock() f.info.ModTime = modTime - f.infoRev++ f.rwLock.Unlock() f.letSave() return nil } +// 不可在Unload状态下调用! func (f *CacheFile) Truncate(size int64) error { logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size) @@ -408,55 +497,32 @@ func (f *CacheFile) Truncate(size int64) error { f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size}) } if f.info.Size != size { - f.info.Dirty = true + f.info.Revision++ } f.info.Size = size - f.infoRev++ f.letSave() return nil } -// 不再使用缓存文件 +// 减少一个引用计数 func (f *CacheFile) Release() { f.cache.lock.Lock() defer f.cache.lock.Unlock() - f.refCount-- - f.freeTime = time.Now() - - f.rwLock.RLock() - defer f.rwLock.RUnlock() - - if f.refCount == 0 && !f.isDeleted { - f.cache.freeCache = append(f.cache.freeCache, f) + f.state.refCount-- + if f.state.refCount == 0 { + f.state.freeTime = time.Now() } } -func (f *CacheFile) Free() { - f.rwLock.Lock() - defer f.rwLock.Unlock() - - if !f.isDeleted { - // TODO 日志 - f.saveMeta() - } - - // 防止在关闭缓存后又保存了文件 - f.isFreed = true - f.metaFile.Close() - f.dataFile.Close() - close(f.saveMetaChan) -} - -func (f *CacheFile) serving() { - savedInfoRev := int64(0) +func (f *CacheFile) serving(saveMetaChan chan any) { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { - case _, ok := <-f.saveMetaChan: + case _, ok := <-saveMetaChan: if !ok { return } @@ -472,24 +538,15 @@ func (f *CacheFile) serving() { break } - // 如果缓存已经被释放,就不要再保存元数据了 - if f.isFreed { + // 停止保存元数据的线程 + if f.noSaveMeta { f.rwLock.Unlock() break } - for { - if f.infoRev == savedInfoRev { - break - } - - // TODO 错误日志 - f.saveMeta() - f.metaFile.Sync() - - savedInfoRev = f.infoRev - break - } + // TODO 错误日志 + f.saveMeta() + f.metaFile.Sync() f.rwLock.Unlock() } @@ -532,6 +589,7 @@ type CacheFileReadWriter struct { writeable bool remote *RemoteLoader remoteLock *sync.Mutex + revision int // 打开文件时,文件的版本号 } func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { @@ -539,8 +597,9 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { return 0, fuse.ErrPermission } - logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.name, off, len(buf)) + logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.pathComps, off, len(buf)) + // 读取数据必须读满整个buf,否则就会被认为是文件已经结束了 totalReadLen := 0 for totalReadLen < len(buf) { curBuf := buf[totalReadLen:] @@ -553,7 +612,7 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { break } - // 先尝试从本地缓存文件里读取 + /// 1. 先尝试从本地缓存文件里读取 rngIdx := FirstContainsIndex(h.file.info.Segments, curOff) if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff { readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff) @@ -568,7 +627,7 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { continue } - // 否则从远端下载 + // 否则从远端下载,计算一下要加载的长度 loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff) if rngIdx+1 < len(h.file.info.Segments) { // 最多加载到下一个段的开头 @@ -577,13 +636,16 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { h.file.rwLock.RUnlock() + /// 2. 开始从远端下载数据 + if h.remote == nil { return totalReadLen, fmt.Errorf("no remote file") } fmt.Printf("load from remote\n") - // 加锁,防止并发Seek + // 由于RemoteLoader的Load方法没有加锁,所以这里要加锁,防止并发Seek导致的问题 + // 可以考虑在RemoteLoader里加锁,这样可以实现跨Writer共用Loader h.remoteLock.Lock() realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff) totalReadLen += realLoadLen @@ -592,9 +654,10 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { return totalReadLen, err } h.remoteLock.Unlock() - logger.Tracef("load from remote: %v", realLoadLen) + /// 3. 数据加载完毕,写入到本地文件 + // 在写入到本地之前,先停止其他的写入,防止冲突 h.file.writeLock.Lock() @@ -626,9 +689,8 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { // 提交到段列表里 h.file.rwLock.Lock() h.file.info.Segments = AddRange(h.file.info.Segments, loadRng) - h.file.infoRev++ - h.file.letSave() h.file.rwLock.Unlock() + h.file.letSave() } return totalReadLen, nil @@ -639,7 +701,7 @@ func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { return 0, fuse.ErrPermission } - logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.name, off, len(buf)) + logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.pathComps, off, len(buf)) // 允许多线程并行写入,但在数据加载期间不能写入 h.file.writeLock.RLock() @@ -657,8 +719,7 @@ func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)}) h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen)) - h.file.info.Dirty = true - h.file.infoRev++ + h.file.info.Revision++ h.file.letSave() @@ -677,17 +738,15 @@ func (f *CacheFileReadWriter) Close() error { } f.file.cache.lock.Lock() - defer f.file.cache.lock.Unlock() + f.file.state.refCount-- + if f.file.state.refCount == 0 { + f.file.state.freeTime = time.Now() + } + f.file.cache.lock.Unlock() f.file.rwLock.Lock() defer f.file.rwLock.Unlock() - f.file.refCount-- - if f.file.refCount == 0 && !f.file.isDeleted { - f.file.cache.freeCache = append(f.file.cache.freeCache, f.file) - f.file.freeTime = time.Now() - } - if f.writeable { f.file.writers = lo2.Remove(f.file.writers, f) } else if f.readable { diff --git a/client2/internal/mount/vfs/vfs.go b/client2/internal/mount/vfs/vfs.go index cf385bb..4a8d4cd 100644 --- a/client2/internal/mount/vfs/vfs.go +++ b/client2/internal/mount/vfs/vfs.go @@ -1,13 +1,12 @@ package vfs import ( - "path/filepath" - "gitlink.org.cn/cloudream/storage/client2/internal/mount/config" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" "gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/downloader" + "gitlink.org.cn/cloudream/storage/common/pkgs/uploader" ) type Vfs struct { @@ -16,11 +15,11 @@ type Vfs struct { cache *cache.Cache } -func NewVfs(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Vfs { +func NewVfs(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Vfs { return &Vfs{ db: db, config: cfg, - cache: cache.NewCache(db, downloader, filepath.Join(cfg.CacheDir, "data"), filepath.Join(cfg.CacheDir, "meta")), + cache: cache.NewCache(cfg, db, uploader, downloader), } } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 4000c6e..d0078ad 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage/common/globals" @@ -84,6 +85,29 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error { return nil } +// 取消上传对象。必须在对象调用了Upload之后调用。 +func (w *UpdateUploader) CancelObject(path string) { + w.lock.Lock() + defer w.lock.Unlock() + + w.successes = lo.Reject(w.successes, func(e coormq.AddObjectEntry, i int) bool { + return e.Path == path + }) +} + +// 重命名对象。必须在对象调用了Upload之后调用。不会检查新路径是否已经存在,需由调用方去做保证。 +func (w *UpdateUploader) RenameObject(path string, newPath string) { + w.lock.Lock() + defer w.lock.Unlock() + + for i := range w.successes { + if w.successes[i].Path == path { + w.successes[i].Path = newPath + break + } + } +} + func (w *UpdateUploader) Commit() (UpdateResult, error) { w.lock.Lock() defer w.lock.Unlock()