package cache import ( "fmt" "io" "os" "path/filepath" "sync" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse" clitypes "gitlink.org.cn/cloudream/storage2/client/types" ) type FileInfo struct { // 文件总大小。可能会超过对应的远端文件的大小。 // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。 Size int64 // 如果大于0,则代表有未提交的修改 Revision int // 数据段列表,按照段开始位置从小到大排列 Segments []*Range // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 // ObjectID cdssdk.ObjectID // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。 // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小 ObjectSize int64 // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 // Hash cdssdk.FileHash // 文件的最后修改时间 ModTime time.Time // 文件的权限 Perm os.FileMode } func (f *FileInfo) Clone() FileInfo { n := *f n.Segments = make([]*Range, len(f.Segments)) for i, seg := range f.Segments { n.Segments[i] = &Range{ Position: seg.Position, Length: seg.Length, } } return n } type Range struct { Position int64 Length int64 } func (r *Range) GetPosition() int64 { return r.Position } func (r *Range) SetPosition(pos int64) { r.Position = pos } func (r *Range) GetLength() int64 { return r.Length } func (r *Range) SetLength(length int64) { r.Length = length } func (r *Range) End() int64 { return r.Position + r.Length } // 所有读写过程共用同一个CacheFile对象。 // 不应该将此结构体保存到对象中 type CacheFile struct { cache *Cache pathComps []string info FileInfo remoteObj *clitypes.Object rwLock *sync.RWMutex readers []*CacheFileHandle writers []*CacheFileHandle saveMetaChan chan any noSaveMeta bool // 防止在Unload之后又保存了文件 isDeleted bool metaFile *os.File dataFile *os.File writeLock *sync.RWMutex // 缓存文件的状态,用于管理缓存文件的生命周期。不受rwLock保护,而是由Cache管理 state cacheState } type cacheState struct { refCount int freeTime time.Time unloadTime time.Time isLoaded bool uploading *uploadingObject } func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ Revision: 1, ModTime: time.Now(), Perm: 0777, } infoData, err := serder.ObjectToJSON(info) if err != nil { return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { return nil, err } metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("create cache meta file: %w", err) } err = io2.WriteAll(metaFile, infoData) if err != nil { metaFile.Close() return nil, fmt.Errorf("save cache meta file: %w", err) } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("create cache data file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: info, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, state: cacheState{ isLoaded: true, }, } go ch.serving(ch.saveMetaChan) return ch, nil } func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644) if err != nil { // 不要包装这里的err return nil, err } info := &FileInfo{} metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644) if err != nil { // 如果有数据文件,而没有元数据文件,则创建一个元数据文件 if !os.IsNotExist(err) { dataFile.Close() return nil, err } stat, err := dataFile.Stat() if err != nil { dataFile.Close() return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { dataFile.Close() return nil, err } metaFile, err = os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { dataFile.Close() return nil, err } info.Size = stat.Size() info.ModTime = stat.ModTime() info.Perm = stat.Mode().Perm() info.Segments = []*Range{{Position: 0, Length: info.Size}} info.Revision = 1 // 未同步的文件视为已修改 } else { err = serder.JSONToObjectStream(metaFile, info) if err != nil { dataFile.Close() return nil, err } } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: *info, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, state: cacheState{ isLoaded: true, }, } go ch.serving(ch.saveMetaChan) return ch, nil } func newCacheFileFromObject(cache *Cache, pathComps []string, obj *clitypes.Object) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ Size: obj.Size, ObjectSize: obj.Size, ModTime: obj.UpdateTime, Perm: 0755, } infoData, err := serder.ObjectToJSON(info) if err != nil { return nil, err } err = os.MkdirAll(filepath.Dir(metaPath), 0755) if err != nil { return nil, err } metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("create cache meta file: %w", err) } err = io2.WriteAll(metaFile, infoData) if err != nil { metaFile.Close() return nil, fmt.Errorf("save cache meta file: %w", err) } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("create cache file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, info: info, remoteObj: obj, rwLock: &sync.RWMutex{}, saveMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, state: cacheState{ isLoaded: true, }, } go ch.serving(ch.saveMetaChan) return ch, nil } func loadCacheFileInfo(cache *Cache, pathComps []string, dataFileInfo os.FileInfo) (*CacheEntryInfo, error) { metaPath := cache.GetCacheMetaPath(pathComps...) metaData, err := os.ReadFile(metaPath) if err == nil { info := &FileInfo{} err = serder.JSONToObject(metaData, info) if err != nil { return nil, err } return &CacheEntryInfo{ PathComps: pathComps, Size: info.Size, Mode: info.Perm, ModTime: info.ModTime, IsDir: false, }, nil } if !os.IsNotExist(err) { return nil, err } return &CacheEntryInfo{ PathComps: pathComps, Size: dataFileInfo.Size(), Mode: dataFileInfo.Mode(), ModTime: dataFileInfo.ModTime(), IsDir: false, }, nil } // 加载缓存文件。如果已经加载了,则无任何效果 func (f *CacheFile) Load() error { f.rwLock.Lock() defer f.rwLock.Unlock() if f.isDeleted { return fmt.Errorf("cache deleted") } metaPath := f.cache.GetCacheMetaPath(f.pathComps...) dataPath := f.cache.GetCacheDataPath(f.pathComps...) metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644) if err != nil { return err } dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644) if err != nil { metaFile.Close() return err } f.saveMetaChan = make(chan any) f.noSaveMeta = false f.metaFile = metaFile f.dataFile = dataFile go f.serving(f.saveMetaChan) return nil } // 关闭缓存文件,保存元数据。但缓存对象依然会留在内存里,以备随时查询元数据。 // // 只应该在引用计数为0时调用。 func (f *CacheFile) Unload() { f.rwLock.Lock() defer f.rwLock.Unlock() if !f.isDeleted { // TODO 日志 f.saveMeta(f.info) } // 防止在关闭缓存后又保存了文件 close(f.saveMetaChan) f.saveMetaChan = nil f.noSaveMeta = true f.metaFile.Close() f.dataFile.Close() } // 可在Unload状态下调用 func (f *CacheFile) RevisionUploaded(rev int) { f.rwLock.Lock() defer f.rwLock.Unlock() if f.info.Revision == rev { f.info.Revision = 0 } if f.saveMetaChan != nil { f.letSave() } } // 可在Unload状态下调用 func (f *CacheFile) Info() CacheEntryInfo { return CacheEntryInfo{ PathComps: f.pathComps, Size: f.info.Size, Mode: f.info.Perm, ModTime: f.info.ModTime, IsDir: false, } } func (f *CacheFile) Revision() int { f.rwLock.RLock() defer f.rwLock.RUnlock() return f.info.Revision } // 可在Unload状态下调用 func (f *CacheFile) Delete() { f.writeLock.Lock() defer f.writeLock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() metaPath := f.cache.GetCacheMetaPath(f.pathComps...) dataPath := f.cache.GetCacheDataPath(f.pathComps...) os.Remove(metaPath) os.Remove(dataPath) // 可能是在被使用状态下删除,也可能是在Unload状态下删除,所以这里不关闭saveMetaChan,而是设置isDeleted为true f.isDeleted = true if f.saveMetaChan != nil { f.letSave() } } // 可在Unload状态下调用 func (f *CacheFile) Move(newPathComps []string) { f.writeLock.Lock() defer f.writeLock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() f.pathComps = newPathComps if f.saveMetaChan != nil { f.letSave() } } // 打开一个写入句柄,同时支持读取 // // 不可在Unload状态下调用! func (f *CacheFile) Open(flags uint32) *CacheFileHandle { logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags) f.cache.lock.Lock() f.state.refCount++ f.cache.lock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() h := &CacheFileHandle{ file: f, remoteLock: &sync.Mutex{}, revision: f.info.Revision, } if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) { h.readable = true h.writeable = true } else if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) { h.writeable = true } else if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) { h.readable = true } if f.remoteObj != nil { h.remote = newRemoteLoader(f) } if h.writeable { f.writers = append(f.writers, h) } else { f.readers = append(f.readers, h) } return h } // 打开一个读取句柄,用于同步本地文件到远端。由于此方法会在扫描缓存时调用,所以refCount增加时不需要加锁 // // 不可在Unload状态下调用! func (f *CacheFile) OpenReadWhenScanning() *CacheFileHandle { f.rwLock.Lock() defer f.rwLock.Unlock() f.state.refCount++ h := &CacheFileHandle{ file: f, remoteLock: &sync.Mutex{}, revision: f.info.Revision, readable: true, } if f.remoteObj != nil { h.remote = newRemoteLoader(f) } f.readers = append(f.readers, h) return h } // 不可在Unload状态下调用! func (f *CacheFile) SetModTime(modTime time.Time) error { logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime) f.rwLock.Lock() f.info.ModTime = modTime f.rwLock.Unlock() f.letSave() return nil } // 不可在Unload状态下调用! func (f *CacheFile) Truncate(size int64) error { logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size) // 修改文件大小前不允许写入 f.writeLock.Lock() defer f.writeLock.Unlock() err := f.dataFile.Truncate(size) if err != nil { return err } f.rwLock.Lock() defer f.rwLock.Unlock() // 调整能从远端下载的大小 f.info.ObjectSize = math2.Min(f.info.ObjectSize, size) // 调整本地缓存文件里的有效数据大小 if size < f.info.Size { f.info.Segments = TruncateRange(f.info.Segments, size) } else if size > f.info.Size { f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size}) } if f.info.Size != size { f.info.Revision++ } f.info.Size = size f.letSave() return nil } // 减少一个引用计数 func (f *CacheFile) Release() { f.cache.lock.Lock() defer f.cache.lock.Unlock() f.state.refCount-- if f.state.refCount == 0 { f.state.freeTime = time.Now() } } func (f *CacheFile) serving(saveMetaChan chan any) { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { case _, ok := <-saveMetaChan: if !ok { return } case <-ticker.C: } f.rwLock.RLock() info := f.info.Clone() // 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件 if f.isDeleted { f.rwLock.RUnlock() break } // 停止保存元数据的线程 if f.noSaveMeta { f.rwLock.RUnlock() break } f.rwLock.RUnlock() // TODO 错误日志 f.saveMeta(info) f.metaFile.Sync() } } func (f *CacheFile) saveMeta(info FileInfo) error { jsonData, err := serder.ObjectToJSON(info) if err != nil { return err } err = f.metaFile.Truncate(0) if err != nil { return err } _, err = f.metaFile.Seek(0, io.SeekStart) if err != nil { return err } err = io2.WriteAll(f.metaFile, jsonData) if err != nil { return err } return nil } func (f *CacheFile) letSave() { select { case f.saveMetaChan <- nil: default: } } type CacheFileHandle struct { file *CacheFile readable bool writeable bool remote *RemoteLoader remoteLock *sync.Mutex revision int // 打开文件时,文件的版本号 } func (h *CacheFileHandle) ReadAt(buf []byte, off int64) (int, error) { log := logger.WithField("F", "CacheFileHandle.ReadAt") log.Tracef("buf: %v, off: %v", len(buf), off) if !h.readable { return 0, fuse.ErrPermission } logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.pathComps, off, len(buf)) // 读取数据必须读满整个buf,否则就会被认为是文件已经结束了 totalReadLen := 0 for totalReadLen < len(buf) { curBuf := buf[totalReadLen:] curOff := off + int64(totalReadLen) h.file.rwLock.RLock() if curOff >= h.file.info.Size { h.file.rwLock.RUnlock() break } /// 1. 先尝试从本地缓存文件里读取 rngIdx := FirstContainsIndex(h.file.info.Segments, curOff) if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff { readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff) realReadLen, err := h.file.dataFile.ReadAt(curBuf[:readLen], curOff) totalReadLen += realReadLen h.file.rwLock.RUnlock() logger.Tracef("read from local cache, n: %v, err: %v", realReadLen, err) if err != nil { return totalReadLen, err } continue } // 否则从远端下载,计算一下要加载的长度 loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff) if rngIdx+1 < len(h.file.info.Segments) { // 最多加载到下一个段的开头 loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff) } h.file.rwLock.RUnlock() /// 2. 开始从远端下载数据 if h.remote == nil { return totalReadLen, fmt.Errorf("no remote file") } // 由于RemoteLoader的Load方法没有加锁,所以这里要加锁,防止并发Seek导致的问题 // 可以考虑在RemoteLoader里加锁,这样可以实现跨Writer共用Loader h.remoteLock.Lock() realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff) totalReadLen += realLoadLen if err != nil { h.remoteLock.Unlock() return totalReadLen, err } h.remoteLock.Unlock() logger.Tracef("load from remote: %v", realLoadLen) /// 3. 数据加载完毕,写入到本地文件 // 在写入到本地之前,先停止其他的写入,防止冲突 h.file.writeLock.Lock() // 停止其他写入后,就可以计算一下实际要写回的长度。 h.file.rwLock.RLock() loadRng := &Range{Position: curOff, Length: int64(realLoadLen)} DifferentRange(loadRng, h.file.info.Segments) h.file.rwLock.RUnlock() if loadRng.Length == 0 { h.file.writeLock.Unlock() continue } // 写入到本地缓存文件 writeStart := loadRng.Position - curOff _, err = h.file.dataFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff) if err != nil { h.file.writeLock.Unlock() logger.Tracef("save to local file: %v", err) return totalReadLen, fmt.Errorf("save to local file: %w", err) } logger.Tracef("save to local: %v", loadRng.Length) h.file.writeLock.Unlock() // 提交到段列表里 h.file.rwLock.Lock() h.file.info.Segments = AddRange(h.file.info.Segments, loadRng) h.file.rwLock.Unlock() h.file.letSave() } return totalReadLen, nil } func (h *CacheFileHandle) WriteAt(buf []byte, off int64) (int, error) { log := logger.WithField("F", "CacheFileHandle.WriteAt") log.Tracef("buf: %v, off: %v", len(buf), off) if !h.writeable { return 0, fuse.ErrPermission } logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.pathComps, off, len(buf)) // 允许多线程并行写入,但在数据加载期间不能写入 h.file.writeLock.RLock() defer h.file.writeLock.RUnlock() // 写入到本地缓存文件 writeLen, err := h.file.dataFile.WriteAt(buf, off) if err != nil { return writeLen, fmt.Errorf("save to local file: %w", err) } // 提交到段列表里 h.file.rwLock.Lock() defer h.file.rwLock.Unlock() h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)}) h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen)) h.file.info.Revision++ h.file.letSave() return writeLen, nil } func (f *CacheFileHandle) Sync() error { return f.file.dataFile.Sync() } func (f *CacheFileHandle) Close() error { f.Sync() if f.remote != nil { f.remote.Close() } f.file.cache.lock.Lock() f.file.state.refCount-- if f.file.state.refCount == 0 { f.file.state.freeTime = time.Now() } f.file.cache.lock.Unlock() f.file.rwLock.Lock() defer f.file.rwLock.Unlock() if f.writeable { f.file.writers = lo2.Remove(f.file.writers, f) } else if f.readable { f.file.readers = lo2.Remove(f.file.readers, f) } return nil }