package cache import ( "fmt" "io" "os" "sync" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math2" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse" ) type FileInfo struct { // 文件总大小。可能会超过对应的远端文件的大小。 // 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。 Size int64 // 本文件是否有未提交的修改 Dirty bool // 数据段列表,按照段开始位置从小到大排列 Segments []*Range // 文件对应的对象ID,仅在文件是一个缓存文件时才有值 // ObjectID cdssdk.ObjectID // 文件对应的对象大小,仅在文件是一个缓存文件时才有值。 // 此值代表有多少数据应该从远端加载,所以可能会小于远端实际大小 ObjectSize int64 // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过 // Hash cdssdk.FileHash // 文件的最后修改时间 ModTime time.Time // 文件的权限 Perm os.FileMode } func (f *FileInfo) Clone() FileInfo { n := *f n.Segments = make([]*Range, len(f.Segments)) for i, seg := range f.Segments { n.Segments[i] = &Range{ Position: seg.Position, Length: seg.Length, } } return n } type Range struct { Position int64 Length int64 } func (r *Range) GetPosition() int64 { return r.Position } func (r *Range) SetPosition(pos int64) { r.Position = pos } func (r *Range) GetLength() int64 { return r.Length } func (r *Range) SetLength(length int64) { r.Length = length } func (r *Range) End() int64 { return r.Position + r.Length } // 所有读写过程共用同一个CacheFile对象。 // 不应该将此结构体保存到对象中 type CacheFile struct { cache *Cache pathComps []string name string info FileInfo remoteObj *cdssdk.Object infoRev int64 rwLock *sync.RWMutex readers []*CacheFileReadWriter writers []*CacheFileReadWriter savingMetaChan chan any isDeleted bool metaFile *os.File dataFile *os.File writeLock *sync.RWMutex } func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ Dirty: true, ModTime: time.Now(), Perm: 0777, } infoData, err := serder.ObjectToJSON(info) if err != nil { return nil, err } metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("create cache meta file: %w", err) } err = io2.WriteAll(metaFile, infoData) if err != nil { metaFile.Close() return nil, fmt.Errorf("save cache meta file: %w", err) } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("create cache data file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, name: pathComps[len(pathComps)-1], info: info, rwLock: &sync.RWMutex{}, savingMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, } go ch.savingMeta() return ch, nil } func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR, 0644) if err != nil { return nil, fmt.Errorf("open cache meta file: %w", err) } info := &FileInfo{} err = serder.JSONToObjectStream(metaFile, info) if err != nil { metaFile.Close() return nil, err } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("open cache data file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, name: pathComps[len(pathComps)-1], info: *info, rwLock: &sync.RWMutex{}, savingMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, } go ch.savingMeta() return ch, nil } func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object) (*CacheFile, error) { metaPath := cache.GetCacheMetaPath(pathComps...) dataPath := cache.GetCacheDataPath(pathComps...) info := FileInfo{ Size: obj.Size, ObjectSize: obj.Size, ModTime: obj.UpdateTime, Perm: 0755, } infoData, err := serder.ObjectToJSON(info) if err != nil { return nil, err } metaFile, err := os.OpenFile(metaPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { return nil, fmt.Errorf("create cache meta file: %w", err) } err = io2.WriteAll(metaFile, infoData) if err != nil { metaFile.Close() return nil, fmt.Errorf("save cache meta file: %w", err) } dataFile, err := os.OpenFile(dataPath, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) if err != nil { metaFile.Close() return nil, fmt.Errorf("create cache file: %w", err) } ch := &CacheFile{ cache: cache, pathComps: pathComps, name: pathComps[len(pathComps)-1], info: info, remoteObj: obj, rwLock: &sync.RWMutex{}, savingMetaChan: make(chan any, 1), metaFile: metaFile, dataFile: dataFile, writeLock: &sync.RWMutex{}, } go ch.savingMeta() return ch, nil } func loadCacheFileInfo(cache *Cache, pathComps []string) (*CacheEntryInfo, error) { metaPath := cache.GetCacheMetaPath(pathComps...) metaData, err := os.ReadFile(metaPath) if err != nil { return nil, err } info := &FileInfo{} err = serder.JSONToObject(metaData, info) if err != nil { return nil, err } return &CacheEntryInfo{ PathComps: pathComps, Size: info.Size, Mode: info.Perm, ModTime: info.ModTime, IsDir: false, }, nil } func (f *CacheFile) PathComps() []string { return f.pathComps } func (f *CacheFile) Name() string { return f.name } func (f *CacheFile) Size() int64 { return f.info.Size } func (f *CacheFile) Mode() os.FileMode { return f.info.Perm } func (f *CacheFile) ModTime() time.Time { return f.info.ModTime } func (f *CacheFile) IsDir() bool { return false } func (f *CacheFile) Info() CacheEntryInfo { return CacheEntryInfo{ PathComps: f.pathComps, Size: f.info.Size, Mode: f.info.Perm, ModTime: f.info.ModTime, IsDir: false, } } func (f *CacheFile) Delete() { f.writeLock.Lock() defer f.writeLock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() metaPath := f.cache.GetCacheMetaPath(f.pathComps...) dataPath := f.cache.GetCacheDataPath(f.pathComps...) os.Remove(metaPath) os.Remove(dataPath) f.isDeleted = true f.letSave() } func (f *CacheFile) Move(newPathComps []string) { f.writeLock.Lock() defer f.writeLock.Unlock() f.rwLock.Lock() defer f.rwLock.Unlock() f.pathComps = newPathComps f.name = newPathComps[len(newPathComps)-1] f.letSave() } func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) { f.remoteObj = obj } // 打开一个写入句柄,同时支持读取 func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter { logger.Tracef("CacheFile.Open: %v, %#x", f.name, flags) f.rwLock.Lock() defer f.rwLock.Unlock() h := &CacheFileReadWriter{ file: f, remoteLock: &sync.Mutex{}, } if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) { h.readable = true h.writeable = true } else if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) { h.writeable = true } else if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) { h.readable = true } if f.remoteObj != nil { h.remote = newRemoteLoader(f) } if h.writeable { f.writers = append(f.writers, h) } else { f.readers = append(f.readers, h) } return h } func (f *CacheFile) SetModTime(modTime time.Time) error { logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime) f.rwLock.Lock() f.info.ModTime = modTime f.infoRev++ f.rwLock.Unlock() f.letSave() return nil } func (f *CacheFile) Truncate(size int64) error { logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size) // 修改文件大小前不允许写入 f.writeLock.Lock() defer f.writeLock.Unlock() err := f.dataFile.Truncate(size) if err != nil { return err } f.rwLock.Lock() defer f.rwLock.Unlock() // 调整能从远端下载的大小 f.info.ObjectSize = math2.Min(f.info.ObjectSize, size) // 调整本地缓存文件里的有效数据大小 if size < f.info.Size { f.info.Segments = TruncateRange(f.info.Segments, size) } else if size > f.info.Size { f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size}) } if f.info.Size != size { f.info.Dirty = true } f.info.Size = size f.infoRev++ f.letSave() return nil } // 不再使用缓存文件 // func (f *CacheFile) Release() { // } func (f *CacheFile) savingMeta() { savedInfoRev := int64(0) ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { case _, ok := <-f.savingMetaChan: if !ok { return } case <-ticker.C: } f.rwLock.Lock() // 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件 if f.isDeleted { f.rwLock.Unlock() break } for { if f.infoRev == savedInfoRev { break } jsonData, err := serder.ObjectToJSON(f.info) if err != nil { // TODO 日志 break } err = f.metaFile.Truncate(0) if err != nil { // TODO 日志 break } _, err = f.metaFile.Seek(0, io.SeekStart) if err != nil { // TODO 日志 break } err = io2.WriteAll(f.metaFile, jsonData) if err != nil { // TODO 日志 break } savedInfoRev = f.infoRev break } f.rwLock.Unlock() } } func (f *CacheFile) letSave() { select { case f.savingMetaChan <- nil: default: } } type CacheFileReadWriter struct { file *CacheFile readable bool writeable bool remote *RemoteLoader remoteLock *sync.Mutex } func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) { if !h.readable { return 0, fuse.ErrPermission } logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.name, off, len(buf)) totalReadLen := 0 for totalReadLen < len(buf) { curBuf := buf[totalReadLen:] curOff := off + int64(totalReadLen) h.file.rwLock.RLock() if curOff >= h.file.info.Size { h.file.rwLock.RUnlock() break } // 先尝试从本地缓存文件里读取 rngIdx := FirstContainsIndex(h.file.info.Segments, curOff) if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff { readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff) realReadLen, err := h.file.dataFile.ReadAt(curBuf[:readLen], curOff) totalReadLen += realReadLen h.file.rwLock.RUnlock() logger.Tracef("read from local cache, n: %v, err: %v", realReadLen, err) if err != nil { return totalReadLen, err } continue } // 否则从远端下载 loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff) if rngIdx+1 < len(h.file.info.Segments) { // 最多加载到下一个段的开头 loadLen = math2.Min(loadLen, h.file.info.Segments[rngIdx+1].Position-curOff) } h.file.rwLock.RUnlock() if h.remote == nil { return totalReadLen, fmt.Errorf("no remote file") } fmt.Printf("load from remote\n") // 加锁,防止并发Seek h.remoteLock.Lock() realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff) totalReadLen += realLoadLen if err != nil { h.remoteLock.Unlock() return totalReadLen, err } h.remoteLock.Unlock() logger.Tracef("load from remote: %v", realLoadLen) // 在写入到本地之前,先停止其他的写入,防止冲突 h.file.writeLock.Lock() // 停止其他写入后,就可以计算一下实际要写回的长度。 h.file.rwLock.RLock() loadRng := &Range{Position: curOff, Length: int64(realLoadLen)} DifferentRange(loadRng, h.file.info.Segments) h.file.rwLock.RUnlock() if loadRng.Length == 0 { h.file.writeLock.Unlock() continue } // 写入到本地缓存文件 writeStart := loadRng.Position - curOff _, err = h.file.dataFile.WriteAt(curBuf[writeStart:writeStart+loadRng.Length], curOff) if err != nil { h.file.writeLock.Unlock() logger.Tracef("save to local file: %v", err) return totalReadLen, fmt.Errorf("save to local file: %w", err) } logger.Tracef("save to local: %v", loadRng.Length) h.file.writeLock.Unlock() // 提交到段列表里 h.file.rwLock.Lock() h.file.info.Segments = AddRange(h.file.info.Segments, loadRng) h.file.infoRev++ h.file.letSave() h.file.rwLock.Unlock() } return totalReadLen, nil } func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) { if !h.writeable { return 0, fuse.ErrPermission } logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.name, off, len(buf)) // 允许多线程并行写入,但在数据加载期间不能写入 h.file.writeLock.RLock() defer h.file.writeLock.RUnlock() // 写入到本地缓存文件 writeLen, err := h.file.dataFile.WriteAt(buf, off) if err != nil { return writeLen, fmt.Errorf("save to local file: %w", err) } // 提交到段列表里 h.file.rwLock.Lock() defer h.file.rwLock.Unlock() h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)}) h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen)) h.file.info.Dirty = true h.file.infoRev++ h.file.letSave() return writeLen, nil } func (f *CacheFileReadWriter) Sync() error { return f.file.dataFile.Sync() } func (f *CacheFileReadWriter) Close() error { f.Sync() if f.remote != nil { f.remote.Close() } f.file.rwLock.Lock() if f.writeable { f.file.writers = lo2.Remove(f.file.writers, f) } else if f.readable { f.file.readers = lo2.Remove(f.file.readers, f) } f.file.rwLock.Unlock() return nil }