Browse Source

上传文件时自动创建Pacakge

gitlink
Sydonian 1 year ago
parent
commit
94ce6ff705
3 changed files with 47 additions and 8 deletions
  1. +28
    -0
      client/internal/db/package.go
  2. +18
    -7
      client/internal/mount/vfs/cache/cache.go
  3. +1
    -1
      client/internal/uploader/uploader.go

+ 28
- 0
client/internal/db/package.go View File

@@ -204,3 +204,31 @@ func (db *PackageDB) BatchAddPackageAccessStat(ctx SQLContext, entries []AddAcce

return nil
}

// 尝试创建指定名称的Bucket和Package,如果Bucket不存在,则创建Bucket,如果Package已存在,则直接返回已有的Package
func (db *PackageDB) TryCreateAll(ctx SQLContext, bktName string, pkgName string) (types.Package, error) {
bkt, err := db.Bucket().GetByName(ctx, bktName)
if err == gorm.ErrRecordNotFound {
bkt, err = db.Bucket().Create(ctx, bktName, time.Now())
if err != nil {
return types.Package{}, fmt.Errorf("create bucket: %w", err)
}
} else if err != nil {
return types.Package{}, fmt.Errorf("get bucket by name: %w", err)
}

pkg, err := db.GetByName(ctx, bkt.BucketID, pkgName)
if err == nil {
return pkg, nil
}
if err != gorm.ErrRecordNotFound {
return types.Package{}, fmt.Errorf("get package by name: %w", err)
}

pkg, err = db.Create(ctx, bkt.BucketID, pkgName)
if err != nil {
return types.Package{}, fmt.Errorf("create package: %w", err)
}

return pkg, nil
}

+ 18
- 7
client/internal/mount/vfs/cache/cache.go View File

@@ -15,7 +15,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/trie"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/lo2"
db2 "gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/config"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/fuse"
@@ -39,7 +39,7 @@ type CacheEntryInfo struct {

type Cache struct {
cfg *config.Config
db *db2.DB
db *db.DB
uploader *uploader.Uploader
downloader *downloader.Downloader
cacheDataDir string
@@ -49,7 +49,7 @@ type Cache struct {
activeCache *trie.Trie[*CacheFile]
}

func NewCache(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
func NewCache(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Cache {
return &Cache{
cfg: cfg,
db: db,
@@ -550,13 +550,13 @@ func (c *Cache) scanningCache() {
}

func (c *Cache) doUploading(pkgs []*uploadingPackage) {
/// 1. 先查询每个Package的信息,如果不存在,则暂时不上传
/// 1. 先尝试创建Package
var sucPkgs []*uploadingPackage
var failedPkgs []*uploadingPackage
for _, pkg := range pkgs {
p, err := c.db.Package().GetByFullName(c.db.DefCtx(), pkg.bktName, pkg.pkgName)
p, err := db.DoTx21(c.db, c.db.Package().TryCreateAll, pkg.bktName, pkg.pkgName)
if err != nil {
logger.Warnf("get package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
logger.Warnf("try create package %v/%v: %v", pkg.bktName, pkg.pkgName, err)
failedPkgs = append(failedPkgs, pkg)
continue
}
@@ -565,7 +565,7 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
sucPkgs = append(sucPkgs, pkg)
}

/// 2. 对于查询失败的Package,直接关闭文件,不进行上传
/// 2. 对于创建失败的Package,直接关闭文件,不进行上传
// 在锁的保护下取消上传状态
c.lock.Lock()
for _, pkg := range failedPkgs {
@@ -586,6 +586,17 @@ func (c *Cache) doUploading(pkgs []*uploadingPackage) {
uploader, err := c.uploader.BeginUpdate(p.pkg.PackageID, 0, nil, nil)
if err != nil {
logger.Warnf("begin update package %v/%v: %v", p.bktName, p.pkgName, err)

// 取消上传状态
c.lock.Lock()
for _, obj := range p.upObjs {
obj.cache.state.uploading = nil
}
c.lock.Unlock()

for _, obj := range p.upObjs {
obj.reader.Close()
}
continue
}



+ 1
- 1
client/internal/uploader/uploader.go View File

@@ -73,7 +73,7 @@ func (u *Uploader) BeginUpdate(pkgID clitypes.PackageID, affinity clitypes.UserS
}

if len(uploadSpaces) == 0 {
return nil, fmt.Errorf("user no available storages")
return nil, fmt.Errorf("user no available userspaces")
}

loadToSpaces := make([]clitypes.UserSpaceDetail, len(loadTo))


Loading…
Cancel
Save