Browse Source

增加上传本地脏数据的机制

gitlink
Sydonian 11 months ago
parent
commit
cab8068a49
8 changed files with 511 additions and 136 deletions
  1. +35
    -4
      client2/internal/cmd/mount.go
  2. +6
    -0
      client2/internal/mount/config/config.go
  3. +3
    -2
      client2/internal/mount/mount.go
  4. +2
    -1
      client2/internal/mount/mount_win.go
  5. +273
    -19
      client2/internal/mount/vfs/cache/cache.go
  6. +165
    -106
      client2/internal/mount/vfs/cache/file.go
  7. +3
    -4
      client2/internal/mount/vfs/vfs.go
  8. +24
    -0
      common/pkgs/uploader/update.go

+ 35
- 4
client2/internal/cmd/mount.go View File

@@ -13,11 +13,13 @@ import (
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader/strategy"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/metacache"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

func init() {
@@ -68,22 +70,36 @@ func mountCmd(mountPoint string, configPath string) {
hubMeta := metacacheHost.AddHubMeta()
conMeta := metacacheHost.AddConnectivity()

// 分布式锁
distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Warnf("new distlock service failed, err: %s", err.Error())
os.Exit(1)
}
go serveDistLock(distlockSvc)

// 初始化下载策略选择器
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)

// 初始化下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel)

// 上传器
uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta)

db, err := db2.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db2 failed, err: %s", err.Error())
}

mnt := mount.NewMount(&mntcfg.Config{
CacheDir: "./cache",
MountPoint: mountPoint,
AttrTimeout: time.Second * 5,
}, db, &dlder)
CacheDir: "./cache",
MountPoint: mountPoint,
AttrTimeout: time.Second * 5,
UploadPendingTime: time.Second * 10,
CacheActiveTime: time.Second * 10,
CacheExpireTime: time.Second * 60,
}, db, uploader, &dlder)

ch := mnt.Start()
for {
@@ -103,3 +119,18 @@ func mountCmd(mountPoint string, configPath string) {
}
}
}

func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}

logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

+ 6
- 0
client2/internal/mount/config/config.go View File

@@ -8,4 +8,10 @@ type Config struct {
GID uint32 `json:"gid"`
UID uint32 `json:"uid"`
AttrTimeout time.Duration `json:"attrTimeout"`
// 被修改的文件在被上传到云端之前的等待时间,如果期间有任何读写操作,则重置等待时间
UploadPendingTime time.Duration `json:"uploadPendingTime"`
// 被加载到内存的缓存文件信息的过期时间,如果文件在此时间内没有被访问过,则从缓存中删除
CacheActiveTime time.Duration `json:"cacheActiveTime"`
// 缓存数据的过期时间,如果文件在此时间内没有被访问过,则从本地删除缓存数据
CacheExpireTime time.Duration `json:"cacheExpireTime"`
}

+ 3
- 2
client2/internal/mount/mount.go View File

@@ -12,6 +12,7 @@ import (
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

type MountEvent interface {
@@ -33,8 +34,8 @@ type Mount struct {
fuse *fuse2.Fuse
}

func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount {
vfs := vfs.NewVfs(cfg, db, downloader)
func NewMount(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount {
vfs := vfs.NewVfs(cfg, db, uploader, downloader)
fuse := fuse2.NewFuse(cfg, vfs)

return &Mount{


+ 2
- 1
client2/internal/mount/mount_win.go View File

@@ -10,6 +10,7 @@ import (
"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

type MountEvent interface {
@@ -28,7 +29,7 @@ type MountingFailedEvent struct {
type Mount struct {
}

func NewMount(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Mount {
func NewMount(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount {
return &Mount{}
}



+ 273
- 19
client2/internal/mount/vfs/cache/cache.go View File

@@ -2,19 +2,25 @@ package cache

import (
"errors"
"io"
"os"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/inhies/go-bytesize"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

type CacheEntry interface {
@@ -32,22 +38,25 @@ type CacheEntryInfo struct {
}

type Cache struct {
cfg *config.Config
db *db2.DB
uploader *uploader.Uploader
downloader *downloader.Downloader
cacheDataDir string
cacheMetaDir string
lock *sync.RWMutex
cacheDone chan any
activeCache *trie.Trie[*CacheFile]
freeCache []*CacheFile
}

func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache {
func NewCache(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
return &Cache{
cfg: cfg,
db: db,
uploader: uploader,
downloader: downloader,
cacheDataDir: cacheDataDir,
cacheMetaDir: cacheMetaDir,
cacheDataDir: filepath.Join(cfg.CacheDir, "data"),
cacheMetaDir: filepath.Join(cfg.CacheDir, "meta"),
lock: &sync.RWMutex{},
cacheDone: make(chan any),
activeCache: trie.NewTrie[*CacheFile](),
@@ -55,7 +64,7 @@ func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cache
}

func (c *Cache) Start() {
go c.clearFreeCache()
go c.scanningCache()
}

func (c *Cache) Stop() {
@@ -130,13 +139,21 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile {
c.lock.Lock()
defer c.lock.Unlock()

node, ok := c.activeCache.WalkEnd(pathComps)
if ok && node.Value != nil {
node.Value.Delete()
if node.Value.state.uploading != nil {
node.Value.state.uploading.isDeleted = true
}
}

ch, err := createNewCacheFile(c, pathComps)
if err != nil {
logger.Warnf("create new cache file %v: %v", pathComps, err)
return nil
}

ch.refCount++
ch.state.refCount++
c.activeCache.CreateWords(pathComps).Value = ch

logger.Debugf("create new cache file %v", pathComps)
@@ -150,13 +167,21 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {

node, ok := c.activeCache.WalkEnd(pathComps)
if ok && node.Value != nil {
if !node.Value.state.isLoaded {
err := node.Value.Load()
if err != nil {
logger.Warnf("load cache %v: %v", pathComps, err)
return nil
}
}

return node.Value
}

ch, err := loadCacheFile(c, pathComps)
if err == nil {
ch.remoteObj = obj
ch.refCount++
ch.state.refCount++
c.activeCache.CreateWords(pathComps).Value = ch

logger.Debugf("load cache %v", pathComps)
@@ -179,7 +204,7 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {
return nil
}

ch.refCount++
ch.state.refCount++
c.activeCache.CreateWords(pathComps).Value = ch

logger.Debugf("create cache %v from object %v", pathComps, obj.ObjectID)
@@ -293,7 +318,9 @@ func (c *Cache) Remove(pathComps []string) error {

if node.Value != nil {
node.Value.Delete()
c.freeCache = lo2.Remove(c.freeCache, node.Value)
if node.Value.state.uploading != nil {
node.Value.state.uploading.isDeleted = true
}
}

node.RemoveSelf(true)
@@ -369,10 +396,26 @@ func (c *Cache) Move(pathComps []string, newPathComps []string) error {
return nil
}

func (c *Cache) clearFreeCache() {
type uploadingPackage struct {
bktName string
pkgName string
pkg cdssdk.Package
upObjs []*uploadingObject
}

type uploadingObject struct {
pathComps []string
cache *CacheFile
reader *CacheFileReadWriter
isDeleted bool
isSuccess bool
}

func (c *Cache) scanningCache() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

lastScanPath := []string{}
for {

select {
@@ -385,17 +428,228 @@ func (c *Cache) clearFreeCache() {
}

c.lock.Lock()
for i, ch := range c.freeCache {
if time.Since(ch.freeTime) > time.Second*30 {
ch.Free()
node, _ := c.activeCache.WalkEnd(ch.PathComps())
node.RemoveSelf(true)
c.freeCache[i] = nil

logger.Debugf("cache %v freed", ch.PathComps())

type packageFullName struct {
bktName string
pkgName string
}

uploadingPkgs := make(map[packageFullName]*uploadingPackage)

visitCnt := 0
visitBreak := false

node, _ := c.activeCache.WalkEnd(lastScanPath)
node.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
ch := node.Value
if ch == nil {
return trie.VisitContinue
}

if ch.state.refCount > 0 {
logger.Debugf("skip cache %v, refCount: %v", path, ch.state.refCount)
return trie.VisitContinue
}

visitCnt++
if ch.Revision() > 0 {
// 1. 本地缓存被修改了,如果一段时间内没有被使用,则进行上传

// 不存放在Package里的文件,不需要上传
if len(ch.pathComps) <= 2 {
return trie.VisitContinue
}

if time.Since(ch.state.freeTime) > c.cfg.UploadPendingTime && ch.state.uploading == nil {
fullName := packageFullName{ch.pathComps[0], ch.pathComps[1]}
pkg, ok := uploadingPkgs[fullName]
if !ok {
pkg = &uploadingPackage{
bktName: ch.pathComps[0],
pkgName: ch.pathComps[1],
}
uploadingPkgs[fullName] = pkg
}
obj := &uploadingObject{
pathComps: lo2.ArrayClone(ch.pathComps),
cache: ch,
reader: ch.OpenReadWhenScanning(),
}
pkg.upObjs = append(pkg.upObjs, obj)
ch.state.uploading = obj
}
} else if ch.state.isLoaded {
// 2. 本地缓存没有被修改,如果一段时间内没有被使用,则进行卸载

if time.Since(ch.state.freeTime) > c.cfg.CacheActiveTime {
ch.Unload()
ch.state.isLoaded = false
ch.state.unloadTime = time.Now()
}
} else {
// 3. 卸载后的缓存,如果一段时间内没有被使用,则进行删除。
// 能达到这个阶段,则肯定已经被同步到远端了

if time.Since(ch.state.unloadTime) > c.cfg.CacheExpireTime {
ch.Delete()
node.RemoveSelf(true)
}
}

// 每次最多遍历500个节点,防止占用锁太久
if visitCnt > 500 {
lastScanPath = lo2.ArrayClone(path)
visitBreak = true
return trie.VisitBreak
}
return trie.VisitContinue
})
if !visitBreak {
lastScanPath = []string{}
}

c.lock.Unlock()

if len(uploadingPkgs) > 0 {
go c.doUploading(lo.Values(uploadingPkgs))
}
}
}

func (c *Cache) doUploading(pkgs []*uploadingPackage) {
/// 1. 先查询每个Package的信息,如果不存在,则暂时不上传
var sucPkgs []*uploadingPackage
var failedPkgs []*uploadingPackage
for _, pkg := range pkgs {
// TODO 用户ID
p, err := c.db.Package().GetUserPackageByName(c.db.DefCtx(), 1, pkg.bktName, pkg.pkgName)
if err != nil {
logger.Warnf("get user package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
failedPkgs = append(failedPkgs, pkg)
continue
}

pkg.pkg = p
sucPkgs = append(sucPkgs, pkg)
}

/// 2. 对于查询失败的Package,直接关闭文件,不进行上传
// 在锁的保护下取消上传状态
c.lock.Lock()
for _, pkg := range failedPkgs {
for _, obj := range pkg.upObjs {
obj.cache.state.uploading = nil
}
}
c.lock.Unlock()
// 关闭文件必须在锁外
for _, pkg := range failedPkgs {
for _, obj := range pkg.upObjs {
obj.reader.Close()
}
}

/// 3. 开始上传每个Package
for _, p := range sucPkgs {
uploader, err := c.uploader.BeginUpdate(1, p.pkg.PackageID, 0, nil, nil)
if err != nil {
logger.Warnf("begin update package %v/%v: %v", p.bktName, p.pkgName, err)
continue
}

upSuc := 0
upSucAmt := int64(0)
upFailed := 0
upStartTime := time.Now()

logger.Infof("begin uploading %v objects to package %v/%v", len(p.upObjs), p.bktName, p.pkgName)
for _, o := range p.upObjs {
rd := cacheFileReader{
rw: o.reader,
}

counter := io2.Counter(&rd)

err = uploader.Upload(cdssdk.JoinObjectPath(o.pathComps[2:]...), counter)
if err != nil {
logger.Warnf("upload object %v: %v", o.pathComps, err)
upFailed++
continue
}

o.isSuccess = true
upSuc++
upSucAmt += counter.Count()
}

// 在锁保护下登记上传结果
c.lock.Lock()

upCancel := 0
upRename := 0

// 检查是否有文件在上传期间发生了变化
var sucObjs []*uploadingObject
for _, o := range p.upObjs {
o.cache.state.uploading = nil
if !o.isSuccess {
continue
}

oldPath := cdssdk.JoinObjectPath(o.pathComps[2:]...)
if o.isDeleted {
uploader.CancelObject(oldPath)
upCancel++
continue
}

newPath := cdssdk.JoinObjectPath(o.cache.pathComps[2:]...)
if newPath != oldPath {
uploader.RenameObject(oldPath, newPath)
upRename++
}

sucObjs = append(sucObjs, o)
}
c.freeCache = lo2.RemoveAllDefault(c.freeCache)

_, err = uploader.Commit()
if err != nil {
logger.Warnf("commit update package %v/%v: %v", p.bktName, p.pkgName, err)
} else {
for _, obj := range sucObjs {
obj.cache.RevisionUploaded(obj.reader.revision)
}

upTime := time.Since(upStartTime)
logger.Infof("upload package %v/%v in %v, upload: %v, size: %v, speed: %v/s, cancel: %v, rename: %v",
p.bktName, p.pkgName, upTime, upSuc, upSucAmt, bytesize.New(float64(upSucAmt)/upTime.Seconds()), upCancel, upRename)
}

c.lock.Unlock()

// 在Cache锁以外关闭文件。
// 关闭文件会影响refCount,所以无论是上传失败还是上传成功,都会在等待一段时间后才进行下一阶段的操作
for _, obj := range p.upObjs {
obj.reader.Close()
}
}
}

type cacheFileReader struct {
rw *CacheFileReadWriter
pos int64
}

func (r *cacheFileReader) Read(p []byte) (int, error) {
n, err := r.rw.ReadAt(p, r.pos)
r.pos += int64(n)
if err != nil {
return n, err
}

if n != len(p) {
return n, io.EOF
}

return n, nil
}

+ 165
- 106
client2/internal/mount/vfs/cache/file.go View File

@@ -20,8 +20,8 @@ type FileInfo struct {
// 文件总大小。可能会超过对应的远端文件的大小。
// 此大小可能与本地缓存文件大小也不同,需要定时将本地缓存文件大小修正到与这个值相同。
Size int64
// 本文件是否有未提交的修改
Dirty bool
// 如果大于0,则代表有未提交的修改
Revision int
// 数据段列表,按照段开始位置从小到大排列
Segments []*Range
// 文件对应的对象ID,仅在文件是一个缓存文件时才有值
@@ -80,24 +80,29 @@ func (r *Range) End() int64 {
type CacheFile struct {
cache *Cache
pathComps []string
name string
info FileInfo
remoteObj *cdssdk.Object
infoRev int64
rwLock *sync.RWMutex
readers []*CacheFileReadWriter
writers []*CacheFileReadWriter
saveMetaChan chan any
noSaveMeta bool // 防止在Unload之后又保存了文件
isDeleted bool
isFreed bool

metaFile *os.File
dataFile *os.File
writeLock *sync.RWMutex

// 下面的字段不受rwLock保护!
refCount int
freeTime time.Time
// 缓存文件的状态,用于管理缓存文件的生命周期。不受rwLock保护,而是由Cache管理
state cacheState
}

type cacheState struct {
refCount int
freeTime time.Time
unloadTime time.Time
isLoaded bool
uploading *uploadingObject
}

func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
@@ -105,9 +110,9 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
dataPath := cache.GetCacheDataPath(pathComps...)

info := FileInfo{
Dirty: true,
ModTime: time.Now(),
Perm: 0777,
Revision: 1,
ModTime: time.Now(),
Perm: 0777,
}

infoData, err := serder.ObjectToJSON(info)
@@ -135,16 +140,18 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
state: cacheState{
isLoaded: true,
},
}

go ch.serving()
go ch.serving(ch.saveMetaChan)

return ch, nil
}
@@ -176,16 +183,18 @@ func loadCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: *info,
rwLock: &sync.RWMutex{},
saveMetaChan: make(chan any, 1),
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
state: cacheState{
isLoaded: true,
},
}

go ch.serving()
go ch.serving(ch.saveMetaChan)

return ch, nil
}
@@ -226,7 +235,6 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object
ch := &CacheFile{
cache: cache,
pathComps: pathComps,
name: pathComps[len(pathComps)-1],
info: info,
remoteObj: obj,
rwLock: &sync.RWMutex{},
@@ -234,9 +242,12 @@ func newCacheFileFromObject(cache *Cache, pathComps []string, obj *cdssdk.Object
metaFile: metaFile,
dataFile: dataFile,
writeLock: &sync.RWMutex{},
state: cacheState{
isLoaded: true,
},
}

go ch.serving()
go ch.serving(ch.saveMetaChan)

return ch, nil
}
@@ -264,30 +275,73 @@ func loadCacheFileInfo(cache *Cache, pathComps []string) (*CacheEntryInfo, error
}, nil
}

func (f *CacheFile) PathComps() []string {
return f.pathComps
}
// 加载缓存文件。如果已经加载了,则无任何效果
func (f *CacheFile) Load() error {
f.rwLock.Lock()
defer f.rwLock.Unlock()

func (f *CacheFile) Name() string {
return f.name
}
if f.isDeleted {
return fmt.Errorf("cache deleted")
}

func (f *CacheFile) Size() int64 {
return f.info.Size
}
metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
dataPath := f.cache.GetCacheDataPath(f.pathComps...)

metaFile, err := os.OpenFile(metaPath, os.O_RDWR, 0644)
if err != nil {
return err
}

func (f *CacheFile) Mode() os.FileMode {
return f.info.Perm
dataFile, err := os.OpenFile(dataPath, os.O_RDWR, 0644)
if err != nil {
metaFile.Close()
return err
}

f.saveMetaChan = make(chan any)
f.noSaveMeta = false
f.metaFile = metaFile
f.dataFile = dataFile

go f.serving(f.saveMetaChan)
return nil
}

func (f *CacheFile) ModTime() time.Time {
return f.info.ModTime
// 关闭缓存文件,保存元数据。但缓存对象依然会留在内存里,以备随时查询元数据。
//
// 只应该在引用计数为0时调用。
func (f *CacheFile) Unload() {
f.rwLock.Lock()
defer f.rwLock.Unlock()

if !f.isDeleted {
// TODO 日志
f.saveMeta()
}

// 防止在关闭缓存后又保存了文件
close(f.saveMetaChan)
f.saveMetaChan = nil
f.noSaveMeta = true
f.metaFile.Close()
f.dataFile.Close()
}

func (f *CacheFile) IsDir() bool {
return false
// 可在Unload状态下调用
func (f *CacheFile) RevisionUploaded(rev int) {
f.rwLock.Lock()
defer f.rwLock.Unlock()

if f.info.Revision == rev {
f.info.Revision = 0
}

if f.saveMetaChan != nil {
f.letSave()
}
}

// 可在Unload状态下调用
func (f *CacheFile) Info() CacheEntryInfo {
return CacheEntryInfo{
PathComps: f.pathComps,
@@ -298,6 +352,13 @@ func (f *CacheFile) Info() CacheEntryInfo {
}
}

func (f *CacheFile) Revision() int {
f.rwLock.RLock()
defer f.rwLock.RUnlock()
return f.info.Revision
}

// 可在Unload状态下调用
func (f *CacheFile) Delete() {
f.writeLock.Lock()
defer f.writeLock.Unlock()
@@ -309,11 +370,16 @@ func (f *CacheFile) Delete() {
dataPath := f.cache.GetCacheDataPath(f.pathComps...)
os.Remove(metaPath)
os.Remove(dataPath)

// 可能是在被使用状态下删除,也可能是在Unload状态下删除,所以这里不关闭saveMetaChan,而是设置isDeleted为true
f.isDeleted = true

f.letSave()
if f.saveMetaChan != nil {
f.letSave()
}
}

// 可在Unload状态下调用
func (f *CacheFile) Move(newPathComps []string) {
f.writeLock.Lock()
defer f.writeLock.Unlock()
@@ -322,31 +388,29 @@ func (f *CacheFile) Move(newPathComps []string) {
defer f.rwLock.Unlock()

f.pathComps = newPathComps
f.name = newPathComps[len(newPathComps)-1]

f.letSave()
if f.saveMetaChan != nil {
f.letSave()
}
}

// 打开一个写入句柄,同时支持读取
//
// 不可在Unload状态下调用!
func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter {
logger.Tracef("CacheFile.Open: %v, %#x", f.name, flags)
logger.Tracef("CacheFile.Open: %v, %#x", f.pathComps, flags)

f.cache.lock.Lock()
f.state.refCount++
f.cache.lock.Unlock()

f.rwLock.Lock()
defer f.rwLock.Unlock()

f.refCount++
if f.refCount == 1 && !f.isDeleted {
f.cache.freeCache = lo2.Remove(f.cache.freeCache, f)
}

// 提前释放Cache的锁
f.cache.lock.Unlock()

h := &CacheFileReadWriter{
file: f,
remoteLock: &sync.Mutex{},
revision: f.info.Revision,
}

if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) {
@@ -371,18 +435,43 @@ func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter {
return h
}

// 打开一个读取句柄,用于同步本地文件到远端。由于此方法会在扫描缓存时调用,所以refCount增加时不需要加锁
//
// 不可在Unload状态下调用!
func (f *CacheFile) OpenReadWhenScanning() *CacheFileReadWriter {
f.rwLock.Lock()
defer f.rwLock.Unlock()

f.state.refCount++

h := &CacheFileReadWriter{
file: f,
remoteLock: &sync.Mutex{},
revision: f.info.Revision,
readable: true,
}

if f.remoteObj != nil {
h.remote = newRemoteLoader(f)
}

f.readers = append(f.readers, h)
return h
}

// 不可在Unload状态下调用!
func (f *CacheFile) SetModTime(modTime time.Time) error {
logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime)

f.rwLock.Lock()
f.info.ModTime = modTime
f.infoRev++
f.rwLock.Unlock()

f.letSave()
return nil
}

// 不可在Unload状态下调用!
func (f *CacheFile) Truncate(size int64) error {
logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size)

@@ -408,55 +497,32 @@ func (f *CacheFile) Truncate(size int64) error {
f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size})
}
if f.info.Size != size {
f.info.Dirty = true
f.info.Revision++
}
f.info.Size = size
f.infoRev++

f.letSave()
return nil
}

// 不再使用缓存文件
// 减少一个引用计数
func (f *CacheFile) Release() {
f.cache.lock.Lock()
defer f.cache.lock.Unlock()

f.refCount--
f.freeTime = time.Now()

f.rwLock.RLock()
defer f.rwLock.RUnlock()

if f.refCount == 0 && !f.isDeleted {
f.cache.freeCache = append(f.cache.freeCache, f)
f.state.refCount--
if f.state.refCount == 0 {
f.state.freeTime = time.Now()
}
}

func (f *CacheFile) Free() {
f.rwLock.Lock()
defer f.rwLock.Unlock()

if !f.isDeleted {
// TODO 日志
f.saveMeta()
}

// 防止在关闭缓存后又保存了文件
f.isFreed = true
f.metaFile.Close()
f.dataFile.Close()
close(f.saveMetaChan)
}

func (f *CacheFile) serving() {
savedInfoRev := int64(0)
func (f *CacheFile) serving(saveMetaChan chan any) {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

for {
select {
case _, ok := <-f.saveMetaChan:
case _, ok := <-saveMetaChan:
if !ok {
return
}
@@ -472,24 +538,15 @@ func (f *CacheFile) serving() {
break
}

// 如果缓存已经被释放,就不要再保存元数据了
if f.isFreed {
// 停止保存元数据的线程
if f.noSaveMeta {
f.rwLock.Unlock()
break
}

for {
if f.infoRev == savedInfoRev {
break
}

// TODO 错误日志
f.saveMeta()
f.metaFile.Sync()

savedInfoRev = f.infoRev
break
}
// TODO 错误日志
f.saveMeta()
f.metaFile.Sync()

f.rwLock.Unlock()
}
@@ -532,6 +589,7 @@ type CacheFileReadWriter struct {
writeable bool
remote *RemoteLoader
remoteLock *sync.Mutex
revision int // 打开文件时,文件的版本号
}

func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
@@ -539,8 +597,9 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
return 0, fuse.ErrPermission
}

logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.name, off, len(buf))
logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.pathComps, off, len(buf))

// 读取数据必须读满整个buf,否则就会被认为是文件已经结束了
totalReadLen := 0
for totalReadLen < len(buf) {
curBuf := buf[totalReadLen:]
@@ -553,7 +612,7 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
break
}

// 先尝试从本地缓存文件里读取
/// 1. 先尝试从本地缓存文件里读取
rngIdx := FirstContainsIndex(h.file.info.Segments, curOff)
if rngIdx >= 0 && h.file.info.Segments[rngIdx].End() > curOff {
readLen := math2.Min(int64(len(curBuf)), h.file.info.Segments[rngIdx].End()-curOff)
@@ -568,7 +627,7 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
continue
}

// 否则从远端下载
// 否则从远端下载,计算一下要加载的长度
loadLen := math2.Min(int64(len(curBuf)), h.file.info.ObjectSize-curOff)
if rngIdx+1 < len(h.file.info.Segments) {
// 最多加载到下一个段的开头
@@ -577,13 +636,16 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {

h.file.rwLock.RUnlock()

/// 2. 开始从远端下载数据

if h.remote == nil {
return totalReadLen, fmt.Errorf("no remote file")
}

fmt.Printf("load from remote\n")

// 加锁,防止并发Seek
// 由于RemoteLoader的Load方法没有加锁,所以这里要加锁,防止并发Seek导致的问题
// 可以考虑在RemoteLoader里加锁,这样可以实现跨Writer共用Loader
h.remoteLock.Lock()
realLoadLen, err := h.remote.Load(curBuf[:loadLen], curOff)
totalReadLen += realLoadLen
@@ -592,9 +654,10 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
return totalReadLen, err
}
h.remoteLock.Unlock()

logger.Tracef("load from remote: %v", realLoadLen)

/// 3. 数据加载完毕,写入到本地文件

// 在写入到本地之前,先停止其他的写入,防止冲突
h.file.writeLock.Lock()

@@ -626,9 +689,8 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
// 提交到段列表里
h.file.rwLock.Lock()
h.file.info.Segments = AddRange(h.file.info.Segments, loadRng)
h.file.infoRev++
h.file.letSave()
h.file.rwLock.Unlock()
h.file.letSave()
}

return totalReadLen, nil
@@ -639,7 +701,7 @@ func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {
return 0, fuse.ErrPermission
}

logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.name, off, len(buf))
logger.Tracef("CacheFileReadWriter.WriteAt: %v, %v, %v", h.file.pathComps, off, len(buf))

// 允许多线程并行写入,但在数据加载期间不能写入
h.file.writeLock.RLock()
@@ -657,8 +719,7 @@ func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {

h.file.info.Segments = AddRange(h.file.info.Segments, &Range{Position: off, Length: int64(writeLen)})
h.file.info.Size = math2.Max(h.file.info.Size, off+int64(writeLen))
h.file.info.Dirty = true
h.file.infoRev++
h.file.info.Revision++

h.file.letSave()

@@ -677,17 +738,15 @@ func (f *CacheFileReadWriter) Close() error {
}

f.file.cache.lock.Lock()
defer f.file.cache.lock.Unlock()
f.file.state.refCount--
if f.file.state.refCount == 0 {
f.file.state.freeTime = time.Now()
}
f.file.cache.lock.Unlock()

f.file.rwLock.Lock()
defer f.file.rwLock.Unlock()

f.file.refCount--
if f.file.refCount == 0 && !f.file.isDeleted {
f.file.cache.freeCache = append(f.file.cache.freeCache, f.file)
f.file.freeTime = time.Now()
}

if f.writeable {
f.file.writers = lo2.Remove(f.file.writers, f)
} else if f.readable {


+ 3
- 4
client2/internal/mount/vfs/vfs.go View File

@@ -1,13 +1,12 @@
package vfs

import (
"path/filepath"

"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/uploader"
)

type Vfs struct {
@@ -16,11 +15,11 @@ type Vfs struct {
cache *cache.Cache
}

func NewVfs(cfg *config.Config, db *db2.DB, downloader *downloader.Downloader) *Vfs {
func NewVfs(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Vfs {
return &Vfs{
db: db,
config: cfg,
cache: cache.NewCache(db, downloader, filepath.Join(cfg.CacheDir, "data"), filepath.Join(cfg.CacheDir, "meta")),
cache: cache.NewCache(cfg, db, uploader, downloader),
}
}



+ 24
- 0
common/pkgs/uploader/update.go View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@@ -84,6 +85,29 @@ func (w *UpdateUploader) Upload(pat string, stream io.Reader) error {
return nil
}

// 取消上传对象。必须在对象调用了Upload之后调用。
func (w *UpdateUploader) CancelObject(path string) {
w.lock.Lock()
defer w.lock.Unlock()

w.successes = lo.Reject(w.successes, func(e coormq.AddObjectEntry, i int) bool {
return e.Path == path
})
}

// 重命名对象。必须在对象调用了Upload之后调用。不会检查新路径是否已经存在,需由调用方去做保证。
func (w *UpdateUploader) RenameObject(path string, newPath string) {
w.lock.Lock()
defer w.lock.Unlock()

for i := range w.successes {
if w.successes[i].Path == path {
w.successes[i].Path = newPath
break
}
}
}

func (w *UpdateUploader) Commit() (UpdateResult, error) {
w.lock.Lock()
defer w.lock.Unlock()


Loading…
Cancel
Save