Browse Source

调试写入接口;支持删除

gitlink
Sydonian 1 year ago
parent
commit
1d017e2cad
12 changed files with 260 additions and 172 deletions
  1. +3
    -3
      client2/internal/mount/fuse/dir_node.go
  2. +2
    -11
      client2/internal/mount/fuse/file_node.go
  3. +2
    -2
      client2/internal/mount/fuse/types.go
  4. +1
    -0
      client2/internal/mount/mount.go
  5. +63
    -2
      client2/internal/mount/vfs/cache/cache.go
  6. +57
    -14
      client2/internal/mount/vfs/cache/file.go
  7. +40
    -43
      client2/internal/mount/vfs/fuse_bucket.go
  8. +24
    -24
      client2/internal/mount/vfs/fuse_dir.go
  9. +4
    -17
      client2/internal/mount/vfs/fuse_file.go
  10. +24
    -27
      client2/internal/mount/vfs/fuse_package.go
  11. +28
    -29
      client2/internal/mount/vfs/fuse_root.go
  12. +12
    -0
      common/pkgs/db2/package.go

+ 3
- 3
client2/internal/mount/fuse/dir_node.go View File

@@ -173,9 +173,9 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse
var _ = (fusefs.NodeMkdirer)((*DirNode)(nil))

func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (node *fusefs.Inode, fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
logger.Tracef("DirNode.Create: %v, %v, %#o, %#o", n.dir.Name(), name, flags, mode)
logger.Tracef("DirNode.Create: %v, %v, %#x, %#o", n.dir.Name(), name, flags, mode)

hd, err := n.dir.NewFile(ctx, name, flags)
hd, flags, err := n.dir.NewFile(ctx, name, flags)
if err != nil {
return nil, nil, 0, translateError(err)
}
@@ -185,7 +185,7 @@ func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode ui
fileNode := newFileNode(n.fs, hd.Entry())
return n.NewInode(ctx, fileNode, fusefs.StableAttr{
Mode: out.Attr.Mode,
}), hd, 0, 0
}), newFileHandle(hd), 0, 0
}

var _ = (fusefs.NodeCreater)((*DirNode)(nil))


+ 2
- 11
client2/internal/mount/fuse/file_node.go View File

@@ -62,21 +62,12 @@ var _ = (fusefs.NodeSetattrer)((*FileNode)(nil))
func (n *FileNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
logger.Tracef("FileNode.Open: %v, %#o", n.file.Name(), flags)

// 只支持以下标志:
// os.O_RDONLY
// os.O_WRONLY
// os.O_RDWR
// os.O_APPEND
// os.O_TRUNC
// os.O_SYNC
// 忽略其他标志位

hd, err := n.file.Open(flags)
hd, flags, err := n.file.Open(flags)
if err != nil {
return nil, 0, translateError(err)
}

return newFileHandle(hd), 0, 0
return newFileHandle(hd), flags, 0
}

var _ = (fusefs.NodeOpener)((*FileNode)(nil))


+ 2
- 2
client2/internal/mount/fuse/types.go View File

@@ -41,7 +41,7 @@ type FsDir interface {
Children(ctx context.Context) ([]FsEntry, error)
ReadChildren() (DirReader, error)
NewDir(ctx context.Context, name string) (FsDir, error)
NewFile(ctx context.Context, name string, flags uint32) (FileHandle, error)
NewFile(ctx context.Context, name string, flags uint32) (FileHandle, uint32, error)
RemoveChild(ctx context.Context, name string) error
MoveChild(ctx context.Context, oldName string, newName string, newParent FsDir) error
}
@@ -51,7 +51,7 @@ type FsFile interface {

Truncate(size uint64) error
SetModTime(time time.Time) error
Open(flags uint32) (FileHandle, error)
Open(flags uint32) (FileHandle, uint32, error)
}

type DirReader interface {


+ 1
- 0
client2/internal/mount/mount.go View File

@@ -51,6 +51,7 @@ func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {
nodeFsOpt := &fusefs.Options{
MountOptions: fuse.MountOptions{
FsName: "CDS",
// Debug: true,
},
}
rawFs := fusefs.NewNodeFS(m.fuse.Root(), nodeFsOpt)


+ 63
- 2
client2/internal/mount/vfs/cache/cache.go View File

@@ -1,12 +1,15 @@
package cache

import (
"errors"
"fmt"
"os"
"path/filepath"
"syscall"
"time"

"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
@@ -35,6 +38,7 @@ type Cache struct {
downloader *downloader.Downloader
cacheDataDir string
cacheMetaDir string
activeCache *trie.Trie[*CacheFile]
}

func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cacheMetaDir string) *Cache {
@@ -43,6 +47,7 @@ func NewCache(db *db2.DB, downloader *downloader.Downloader, cacheDataDir, cache
downloader: downloader,
cacheDataDir: cacheDataDir,
cacheMetaDir: cacheMetaDir,
activeCache: trie.NewTrie[*CacheFile](),
}
}

@@ -76,6 +81,12 @@ func (c *Cache) GetCacheMetaPathComps(comps ...string) []string {

// 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
node, ok := c.activeCache.WalkEnd(pathComps)
if ok && node.Value != nil {
info := node.Value.Info()
return &info
}

metaPath := c.GetCacheMetaPath(pathComps...)
stat, err := os.Stat(metaPath)
if err != nil {
@@ -107,14 +118,22 @@ func (c *Cache) CreateFile(pathComps []string) *CacheFile {
// TODO 日志记录
return nil
}

c.activeCache.CreateWords(pathComps).Value = ch
return ch
}

// 尝试加载缓存文件,如果文件不存在,则使用obj的信息创建一个新缓存文件,而如果obj为nil,那么会返回nil。
func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {
node, ok := c.activeCache.WalkEnd(pathComps)
if ok && node.Value != nil {
return node.Value
}

ch, err := loadCacheFile(c, pathComps)
if err == nil {
ch.remoteObj = obj
c.activeCache.CreateWords(pathComps).Value = ch
return ch
}

@@ -134,6 +153,8 @@ func (c *Cache) LoadFile(pathComps []string, obj *cdssdk.Object) *CacheFile {
logger.Tracef("make cache file from object: %v", err)
return nil
}

c.activeCache.CreateWords(pathComps).Value = ch
return ch
}

@@ -178,13 +199,29 @@ func (c *Cache) LoadDir(pathComps []string, createOpt *CreateDirOption) *CacheDi

// 加载指定路径下的所有缓存条目信息
func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {
var infos []CacheEntryInfo

exists := make(map[string]bool)

node, ok := c.activeCache.WalkEnd(pathComps)
if ok {
for name, child := range node.WordNexts {
if child.Value != nil {
infos = append(infos, child.Value.Info())
exists[name] = true
}
}
}

osEns, err := os.ReadDir(c.GetCacheMetaPath(pathComps...))
if err != nil {
return nil
}

var infos []CacheEntryInfo
for _, e := range osEns {
if exists[e.Name()] {
continue
}

if e.IsDir() {
info, err := loadCacheDirInfo(c, append(lo2.ArrayClone(pathComps), e.Name()))
@@ -208,7 +245,31 @@ func (c *Cache) StatMany(pathComps []string) []CacheEntryInfo {

// 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。
func (c *Cache) Remove(pathComps []string) error {
return fmt.Errorf("not implemented")
node, ok := c.activeCache.WalkEnd(pathComps)
if ok {
if len(node.WordNexts) > 0 {
return fuse.ErrNotEmpty
}

if node.Value != nil {
node.Value.Delete()
}

node.RemoveSelf(true)
return nil
}

metaPath := c.GetCacheMetaPath(pathComps...)
err := os.Remove(metaPath)
if err == nil || os.IsNotExist(err) {
return nil
}

if errors.Is(err, syscall.ENOTEMPTY) {
return fuse.ErrNotEmpty
}

return err
}

// 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。


+ 57
- 14
client2/internal/mount/vfs/cache/file.go View File

@@ -86,6 +86,7 @@ type CacheFile struct {
readers []*CacheFileReadWriter
writers []*CacheFileReadWriter
backgroundChan chan any
isDeleted bool

localFile *os.File
writeLock *sync.RWMutex
@@ -98,7 +99,7 @@ func createNewCacheFile(cache *Cache, pathComps []string) (*CacheFile, error) {
info := FileInfo{
Dirty: true,
ModTime: time.Now(),
Perm: 0644,
Perm: 0777,
}

infoData, err := serder.ObjectToJSON(info)
@@ -268,37 +269,61 @@ func (f *CacheFile) Info() CacheEntryInfo {
}
}

func (f *CacheFile) Delete() {
f.writeLock.Lock()
defer f.writeLock.Unlock()

f.rwLock.Lock()
defer f.rwLock.Unlock()

metaPath := f.cache.GetCacheMetaPath(f.pathComps...)
dataPath := f.cache.GetCacheDataPath(f.pathComps...)
os.Remove(metaPath)
os.Remove(dataPath)
f.isDeleted = true
}

func (f *CacheFile) SetRemoteObject(obj *cdssdk.Object) {
f.remoteObj = obj
}

// 打开一个写入句柄,同时支持读取
func (f *CacheFile) Open(readOnly bool) *CacheFileReadWriter {
logger.Tracef("CacheFile.Open: %v, %v", f.name, readOnly)
func (f *CacheFile) Open(flags uint32) *CacheFileReadWriter {
logger.Tracef("CacheFile.Open: %v, %#x", f.name, flags)

f.rwLock.Lock()
defer f.rwLock.Unlock()

h := &CacheFileReadWriter{
file: f,
readOnly: readOnly,
remoteLock: &sync.Mutex{},
}

if flags&uint32(os.O_RDWR) == uint32(os.O_RDWR) {
h.readable = true
h.writeable = true
} else if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) {
h.writeable = true
} else if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) {
h.readable = true
}

if f.remoteObj != nil {
h.remote = newRemoteLoader(f)
}

if readOnly {
f.readers = append(f.readers, h)
} else {
if h.writeable {
f.writers = append(f.writers, h)
} else {
f.readers = append(f.readers, h)
}

return h
}

func (f *CacheFile) SetModTime(modTime time.Time) error {
logger.Tracef("CacheFile.SetModTime: %v, %v", f.pathComps, modTime)

f.rwLock.Lock()
f.info.ModTime = modTime
f.infoRev++
@@ -309,6 +334,8 @@ func (f *CacheFile) SetModTime(modTime time.Time) error {
}

func (f *CacheFile) Truncate(size int64) error {
logger.Tracef("CacheFile.Truncate: %v, %v", f.pathComps, size)

// 修改文件大小前不允许写入
f.writeLock.Lock()
defer f.writeLock.Unlock()
@@ -327,9 +354,12 @@ func (f *CacheFile) Truncate(size int64) error {
// 调整本地缓存文件里的有效数据大小
if size < f.info.Size {
f.info.Segments = TruncateRange(f.info.Segments, size)
} else {
} else if size > f.info.Size {
f.info.Segments = AddRange(f.info.Segments, &Range{Position: f.info.Size, Length: size - f.info.Size})
}
if f.info.Size != size {
f.info.Dirty = true
}
f.info.Size = size
f.infoRev++

@@ -359,6 +389,12 @@ func (f *CacheFile) background() {

f.rwLock.Lock()

// 如果文件已被删除,则不能再保存元数据,防止覆盖掉新创建的同名文件
if f.isDeleted {
f.rwLock.Unlock()
continue
}

for {
if f.infoRev == savedInfoRev {
break
@@ -393,12 +429,17 @@ func (f *CacheFile) notifyBackground() {

type CacheFileReadWriter struct {
file *CacheFile
readOnly bool
readable bool
writeable bool
remote *RemoteLoader
remoteLock *sync.Mutex
}

func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
if !h.readable {
return 0, fuse.ErrPermission
}

logger.Tracef("CacheFileReadWriter.ReadAt: %v, %v, %v", h.file.name, off, len(buf))

totalReadLen := 0
@@ -495,7 +536,7 @@ func (h *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {
}

func (h *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {
if h.readOnly {
if !h.writeable {
return 0, fuse.ErrPermission
}

@@ -532,13 +573,15 @@ func (f *CacheFileReadWriter) Sync() error {
func (f *CacheFileReadWriter) Close() error {
f.Sync()

f.remote.Close()
if f.remote != nil {
f.remote.Close()
}

f.file.rwLock.Lock()
if f.readOnly {
f.file.readers = lo2.Remove(f.file.readers, f)
} else {
if f.writeable {
f.file.writers = lo2.Remove(f.file.writers, f)
} else if f.readable {
f.file.readers = lo2.Remove(f.file.readers, f)
}
f.file.rwLock.Unlock()
return nil


+ 40
- 43
client2/internal/mount/vfs/fuse_bucket.go View File

@@ -15,24 +15,24 @@ import (

type FuseBucket struct {
vfs *Vfs
name string
bktName string
modTime time.Time
}

func newBucketFromCache(c cache.CacheEntryInfo, vfs *Vfs) fuse.FsDir {
return &FuseBucket{
vfs: vfs,
name: c.PathComps[len(c.PathComps)-1],
bktName: c.PathComps[len(c.PathComps)-1],
modTime: c.ModTime,
}
}

func (r *FuseBucket) PathComps() []string {
return []string{r.name}
return []string{r.bktName}
}

func (r *FuseBucket) Name() string {
return r.name
return r.bktName
}

func (r *FuseBucket) Size() int64 {
@@ -62,13 +62,13 @@ func (r *FuseBucket) SetModTime(time time.Time) error {

// 如果不存在,应该返回ErrNotExists
func (r *FuseBucket) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
childPathComps := []string{r.name, name}
childPathComps := []string{r.bktName, name}
ca := r.vfs.cache.Stat(childPathComps)

if ca == nil {
// TODO UserID

pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.name, name)
pkg, err := r.vfs.db.Package().GetUserPackageByName(r.vfs.db.DefCtx(), 1, r.bktName, name)
if err == nil {
dir := r.vfs.cache.LoadDir(childPathComps, &cache.CreateDirOption{
ModTime: pkg.CreateTime,
@@ -110,9 +110,9 @@ func (r *FuseBucket) ReadChildren() (fuse.DirReader, error) {
func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

infos := r.vfs.cache.StatMany([]string{r.name})
infos := r.vfs.cache.StatMany([]string{r.bktName})

pkgs, err := r.vfs.db.Package().GetBucketPackagesByName(r.vfs.db.DefCtx(), r.name)
pkgs, err := r.vfs.db.Package().GetBucketPackagesByName(r.vfs.db.DefCtx(), r.bktName)
if err != nil {
return nil, err
}
@@ -134,7 +134,7 @@ func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) {
}

for _, pkg := range pkgMap {
dir := r.vfs.cache.LoadDir([]string{r.name, pkg.Name}, &cache.CreateDirOption{
dir := r.vfs.cache.LoadDir([]string{r.bktName, pkg.Name}, &cache.CreateDirOption{
ModTime: pkg.CreateTime,
})
if dir == nil {
@@ -148,7 +148,7 @@ func (r *FuseBucket) listChildren() ([]fuse.FsEntry, error) {
}

func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.CreateDir([]string{r.name, name})
cache := r.vfs.cache.CreateDir([]string{r.bktName, name})
if cache == nil {
return nil, fuse.ErrPermission
}
@@ -158,7 +158,7 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error
// 不关注创建是否成功,仅尝试一下
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
db := r.vfs.db
bkt, err := db.Bucket().GetByName(tx, name)
bkt, err := db.Bucket().GetByName(tx, r.bktName)
if err != nil {
return fmt.Errorf("get bucket: %v", err)
}
@@ -174,51 +174,48 @@ func (r *FuseBucket) NewDir(ctx context.Context, name string) (fuse.FsDir, error
return newPackageFromCache(cache.Info(), r.vfs), nil
}

func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
cache := r.vfs.cache.CreateFile([]string{r.name, name})
func (r *FuseBucket) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile([]string{r.bktName, name})
if cache == nil {
return nil, fuse.ErrPermission
return nil, 0, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)

if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
}

func (r *FuseBucket) RemoveChild(ctx context.Context, name string) error {
err := r.vfs.cache.Remove([]string{r.name, name})
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db

pkg, err := d.Package().GetUserPackageByName(tx, 1, r.name, name)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil
pkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, name)
if err == nil {
has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, "")
if err != nil {
return err
}
if has {
return fuse.ErrNotEmpty
}
} else if err != gorm.ErrRecordNotFound {
return err
}

return d.Package().DeleteComplete(tx, pkg.PackageID)
})
err = r.vfs.cache.Remove([]string{r.bktName, name})
if err != nil {
return err
}

return nil
if pkg.PackageID != 0 {
d.Package().DeleteComplete(tx, pkg.PackageID)
}

return nil
})
}

func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {
@@ -226,7 +223,7 @@ func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName stri
// TODO 有问题

newParentNode := newParent.(FuseNode)
_, err := r.vfs.cache.Move([]string{r.name, oldName}, append(newParentNode.PathComps(), newName))
_, err := r.vfs.cache.Move([]string{r.bktName, oldName}, append(newParentNode.PathComps(), newName))
if err != nil {
return err
}
@@ -236,14 +233,14 @@ func (r *FuseBucket) MoveChild(ctx context.Context, oldName string, newName stri

func (r *FuseBucket) loadCacheDir() *cache.CacheDir {
var createOpt *cache.CreateDirOption
bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), r.name)
bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), r.bktName)
if err == nil {
createOpt = &cache.CreateDirOption{
ModTime: bkt.CreateTime,
}
}

return r.vfs.cache.LoadDir([]string{r.name}, createOpt)
return r.vfs.cache.LoadDir([]string{r.bktName}, createOpt)
}

var _ fuse.FsDir = (*FuseBucket)(nil)


+ 24
- 24
client2/internal/mount/vfs/fuse_dir.go View File

@@ -210,49 +210,49 @@ func (r *FuseDir) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
return newDirFromCache(cache.Info(), r.vfs), nil
}

func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
func (r *FuseDir) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile(append(lo2.ArrayClone(r.pathComps), name))
if cache == nil {
return nil, fuse.ErrPermission
return nil, 0, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)

if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
}

func (r *FuseDir) RemoveChild(ctx context.Context, name string) error {
pathComps := append(lo2.ArrayClone(r.pathComps), name)
err := r.vfs.cache.Remove(pathComps)
if err != nil {
return err
}
joinedPath := cdssdk.JoinObjectPath(pathComps[2:]...)
d := r.vfs.db

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
pkg, err := d.Package().GetUserPackageByName(tx, 1, pathComps[0], pathComps[1])
if err == nil {
has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, joinedPath+cdssdk.ObjectPathSeparator)
if err != nil {
return err
}
if has {
return fuse.ErrNotEmpty
}
}

err = r.vfs.cache.Remove(pathComps)
if err != nil {
return err
}

return d.Object().DeleteByPath(tx, pkg.PackageID, cdssdk.JoinObjectPath(pathComps[2:]...))
})
if pkg.PackageID > 0 {
// 存储系统不会保存目录结构,所以这里是尝试删除同名文件
d.Object().DeleteByPath(tx, pkg.PackageID, joinedPath)
}

return nil
return nil
})
}

func (r *FuseDir) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {


+ 4
- 17
client2/internal/mount/vfs/fuse_file.go View File

@@ -1,7 +1,6 @@
package vfs

import (
"fmt"
"os"
"time"

@@ -77,35 +76,24 @@ func (n *FuseFileNode) SetModTime(time time.Time) error {
return cacheFile.SetModTime(time)
}

func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, error) {
func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, uint32, error) {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
// 如果文件不存在,也不进行创建,因为创建不应该调用这个接口
return nil, fuse.ErrNotExists
return nil, 0, fuse.ErrNotExists
}

if flags&uint32(os.O_WRONLY) == uint32(os.O_WRONLY) {
hd := cacheFile.Open(true)
return newFileHandle(n, hd), nil
}

if flags&uint32(os.O_RDONLY) == uint32(os.O_RDONLY) {
hd := cacheFile.Open(false)
return newFileHandle(n, hd), nil
}

return nil, fuse.ErrPermission
hd := cacheFile.Open(flags)
return newFileHandle(n, hd), flags, nil
}

func (n *FuseFileNode) loadCacheFile() *cache.CacheFile {
fmt.Printf("path: %v\n", n.pathComps)
if len(n.pathComps) <= 2 {
return n.vfs.cache.LoadFile(n.pathComps, nil)
}

cdsObj, err := n.vfs.db.Object().GetByFullPath(n.vfs.db.DefCtx(), n.pathComps[0], n.pathComps[1], cdssdk.JoinObjectPath(n.pathComps[2:]...))
if err == nil {
fmt.Printf("obj: %v\n", cdsObj)
file := n.vfs.cache.LoadFile(n.pathComps, &cdsObj)
if file == nil {
return nil
@@ -115,7 +103,6 @@ func (n *FuseFileNode) loadCacheFile() *cache.CacheFile {
}

if err == gorm.ErrRecordNotFound {
fmt.Printf("not found\n")
return n.vfs.cache.LoadFile(n.pathComps, nil)
}



+ 24
- 27
client2/internal/mount/vfs/fuse_package.go View File

@@ -205,52 +205,49 @@ func (r *FusePackage) NewDir(ctx context.Context, name string) (fuse.FsDir, erro
return nil, fuse.ErrPermission
}

return newPackageFromCache(cache.Info(), r.vfs), nil
return newDirFromCache(cache.Info(), r.vfs), nil
}

func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
func (r *FusePackage) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile([]string{r.bktName, r.pkgName, name})
if cache == nil {
return nil, fuse.ErrPermission
return nil, 0, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)

if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
}

func (r *FusePackage) RemoveChild(ctx context.Context, name string) error {
err := r.vfs.cache.Remove([]string{r.bktName, r.pkgName, name})
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db

d := r.vfs.db
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {
pkg, err := d.Package().GetUserPackageByName(tx, 1, r.bktName, r.pkgName)
if err == nil {
has, err := d.Object().HasObjectWithPrefix(tx, pkg.PackageID, name+cdssdk.ObjectPathSeparator)
if err != nil {
return err
}
if has {
return fuse.ErrNotEmpty
}
}

err = r.vfs.cache.Remove([]string{r.bktName, r.pkgName, name})
if err != nil {
return err
}

return d.Object().DeleteByPath(tx, pkg.PackageID, name)
})
if pkg.PackageID > 0 {
// 存储系统不会保存目录结构,所以这里是尝试删除同名文件
d.Object().DeleteByPath(tx, pkg.PackageID, name)
}

return nil
return nil
})
}

func (r *FusePackage) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {


+ 28
- 29
client2/internal/mount/vfs/fuse_root.go View File

@@ -150,51 +150,50 @@ func (r *Root) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
return newBucketFromCache(cache.Info(), r.vfs), nil
}

func (r *Root) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
func (r *Root) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, uint32, error) {
cache := r.vfs.cache.CreateFile([]string{name})
if cache == nil {
return nil, fuse.ErrPermission
return nil, 0, fuse.ErrPermission
}
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFileFromCache(cache.Info(), r.vfs)

if flags&uint32(os.O_WRONLY) != 0 {
hd := cache.Open(false)
return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cache.Open(true)
return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
hd := cache.Open(flags)
return newFileHandle(fileNode, hd), flags, nil
}

func (r *Root) RemoveChild(ctx context.Context, name string) error {
err := r.vfs.cache.Remove([]string{name})
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db
db := r.vfs.db
return r.vfs.db.DoTx(func(tx db2.SQLContext) error {

bkt, err := d.Bucket().GetByName(tx, name)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil
bkt, err := db.Bucket().GetByName(tx, name)
if err == nil {
has, err := db.Package().HasPackageIn(tx, bkt.BucketID)
if err != nil {
return err
}
if has {
return fuse.ErrNotEmpty
}

} else if err != gorm.ErrRecordNotFound {
return err
}

return d.Bucket().DeleteComplete(tx, bkt.BucketID)
})
err = r.vfs.cache.Remove([]string{name})
if err != nil {
return err
}

return nil
if bkt.BucketID != 0 {
// 不管是否成功
db.Bucket().DeleteComplete(tx, bkt.BucketID)
}

return nil
})
}

func (r *Root) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {


+ 12
- 0
common/pkgs/db2/package.go View File

@@ -190,3 +190,15 @@ func (*PackageDB) ChangeState(ctx SQLContext, packageID cdssdk.PackageID, state
err := ctx.Exec("UPDATE Package SET State = ? WHERE PackageID = ?", state, packageID).Error
return err
}

func (*PackageDB) HasPackageIn(ctx SQLContext, bucketID cdssdk.BucketID) (bool, error) {
var pkg cdssdk.Package
err := ctx.Table("Package").Where("BucketID = ?", bucketID).First(&pkg).Error
if err == gorm.ErrRecordNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}

Loading…
Cancel
Save