Browse Source

调整目录结构;实现文件分段缓存的部分逻辑

gitlink
Sydonian 1 year ago
parent
commit
5f3c9baf49
24 changed files with 1068 additions and 106 deletions
  1. +10
    -0
      client2/internal/mount/config/config.go
  2. +3
    -1
      client2/internal/mount/fuse/dir_handle.go
  3. +19
    -22
      client2/internal/mount/fuse/dir_node.go
  4. +6
    -49
      client2/internal/mount/fuse/file_handle.go
  5. +8
    -6
      client2/internal/mount/fuse/file_node.go
  6. +22
    -17
      client2/internal/mount/fuse/fuse.go
  7. +6
    -4
      client2/internal/mount/fuse/node.go
  8. +3
    -3
      client2/internal/mount/fuse/types.go
  9. +21
    -0
      client2/internal/mount/mount.go
  10. +107
    -0
      client2/internal/mount/vfs/cache/cache.go
  11. +50
    -0
      client2/internal/mount/vfs/cache/dir.go
  12. +135
    -0
      client2/internal/mount/vfs/cache/file.go
  13. +12
    -0
      client2/internal/mount/vfs/cache/file_saver.go
  14. +130
    -0
      client2/internal/mount/vfs/cache/file_segment.go
  15. +36
    -0
      client2/internal/mount/vfs/cache/file_segment_test.go
  16. +38
    -0
      client2/internal/mount/vfs/dir_reader.go
  17. +5
    -0
      client2/internal/mount/vfs/fuse.go
  18. +22
    -0
      client2/internal/mount/vfs/fuse_bucket.go
  19. +149
    -0
      client2/internal/mount/vfs/fuse_file.go
  20. +233
    -0
      client2/internal/mount/vfs/fuse_root.go
  21. +17
    -0
      client2/internal/mount/vfs/vfs.go
  22. +31
    -0
      common/pkgs/db2/bucket.go
  23. +3
    -3
      common/pkgs/db2/object.go
  24. +2
    -1
      coordinator/internal/mq/object.go

+ 10
- 0
client2/internal/mount/config/config.go View File

@@ -0,0 +1,10 @@
package config

import "time"

type Config struct {
CacheDir string `json:"cacheDir"`
GID uint32 `json:"gid"`
UID uint32 `json:"uid"`
AttrTimeout time.Duration `json:"attrTimeout"`
}

client2/internal/mount/dir_handle.go → client2/internal/mount/fuse/dir_handle.go View File

@@ -1,4 +1,6 @@
package mount
//go:build linux || (darwin && amd64)

package fuse

import (
"io"

client2/internal/mount/dir_node.go → client2/internal/mount/fuse/dir_node.go View File

@@ -1,4 +1,6 @@
package mount
//go:build linux || (darwin && amd64)

package fuse

import (
"context"
@@ -14,18 +16,18 @@ type DirNode struct {
dir FsDir
}

func newDirNode(vfs *Vfs, dir FsDir) *DirNode {
return &DirNode{NodeBase: NodeBase{vfs: vfs}, dir: dir}
func newDirNode(fs *Fuse, dir FsDir) *DirNode {
return &DirNode{NodeBase: NodeBase{fs: fs}, dir: dir}
}

func (n *DirNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
n.vfs.fillAttrOut(n.dir, out)
n.fs.fillAttrOut(n.dir, out)
return 0
}

// Setattr sets attributes for an Inode.
func (n *DirNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) {
n.vfs.fillAttrOut(n.dir, out)
n.fs.fillAttrOut(n.dir, out)

_, ok := in.GetSize()
if ok {
@@ -76,16 +78,16 @@ func (n *DirNode) Lookup(ctx context.Context, name string, out *fuse.EntryOut) (

switch child := child.(type) {
case FsDir:
node := newDirNode(n.vfs, child)
n.vfs.fillEntryOut(child, out)
node := newDirNode(n.fs, child)
n.fs.fillEntryOut(child, out)

return node.NewInode(ctx, node, fusefs.StableAttr{
Mode: out.Attr.Mode,
}), 0

case FsFile:
node := newFileNode(n.vfs, child)
n.vfs.fillEntryOut(child, out)
node := newFileNode(n.fs, child)
n.fs.fillEntryOut(child, out)

return node.NewInode(ctx, node, fusefs.StableAttr{
Mode: out.Attr.Mode,
@@ -106,7 +108,7 @@ var _ = (fusefs.NodeOpendirer)((*DirNode)(nil))

type dirStream struct {
reader DirReader
vfs *Vfs
fs *Fuse
}

func (s *dirStream) HasNext() bool {
@@ -122,7 +124,7 @@ func (s *dirStream) Next() (fuse.DirEntry, syscall.Errno) {

return fuse.DirEntry{
Name: entry.Name(),
Mode: s.vfs.getMode(entry),
Mode: s.fs.getMode(entry),
}, 0
}

@@ -136,7 +138,7 @@ func (n *DirNode) Readdir(ctx context.Context) (ds fusefs.DirStream, errno sysca
return nil, translateError(err)
}

return &dirStream{reader: reader, vfs: n.vfs}, 0
return &dirStream{reader: reader, fs: n.fs}, 0
}

var _ = (fusefs.NodeReaddirer)((*DirNode)(nil))
@@ -147,8 +149,8 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse
return nil, translateError(err)
}

node := newDirNode(n.vfs, newDir)
n.vfs.fillEntryOut(newDir, out)
node := newDirNode(n.fs, newDir)
n.fs.fillEntryOut(newDir, out)

return node.NewInode(ctx, node, fusefs.StableAttr{
Mode: out.Attr.Mode,
@@ -158,19 +160,14 @@ func (n *DirNode) Mkdir(ctx context.Context, name string, mode uint32, out *fuse
var _ = (fusefs.NodeMkdirer)((*DirNode)(nil))

func (n *DirNode) Create(ctx context.Context, name string, flags uint32, mode uint32, out *fuse.EntryOut) (node *fusefs.Inode, fh fusefs.FileHandle, fuseFlags uint32, errno syscall.Errno) {
newFile, err := n.dir.NewFile(ctx, name, flags)
if err != nil {
return nil, nil, 0, translateError(err)
}

hd, err := newFile.Open(flags)
hd, err := n.dir.NewFile(ctx, name, flags)
if err != nil {
return nil, nil, 0, translateError(err)
}

n.vfs.fillEntryOut(newFile, out)
n.fs.fillEntryOut(hd.Entry(), out)

fileNode := newFileNode(n.vfs, newFile)
fileNode := newFileNode(n.fs, hd.Entry())
return fileNode.NewInode(ctx, fileNode, fusefs.StableAttr{
Mode: out.Attr.Mode,
}), hd, 0, 0

client2/internal/mount/file_handle.go → client2/internal/mount/fuse/file_handle.go View File

@@ -1,4 +1,6 @@
package mount
//go:build linux || (darwin && amd64)

package fuse

import (
"context"
@@ -11,16 +13,12 @@ import (

// 作为本模块的FileHandle与fusefs.FileHandle的桥梁,用于实现fusefs.FileHandle接口
type fileHandle struct {
hd FileHandle
file FsFile
vfs *Vfs
hd FileHandle
}

func newFileHandle(hd FileHandle, file FsFile, vfs *Vfs) *fileHandle {
func newFileHandle(hd FileHandle) *fileHandle {
return &fileHandle{
hd: hd,
file: file,
vfs: vfs,
hd: hd,
}
}

@@ -76,44 +74,3 @@ func (f *fileHandle) Fsync(ctx context.Context, flags uint32) (errno syscall.Err
}

var _ fusefs.FileFsyncer = (*fileHandle)(nil)

// Getattr reads attributes for an Inode. The library will ensure that
// Mode and Ino are set correctly. For files that are not opened with
// FOPEN_DIRECTIO, Size should be set so it can be read correctly. If
// returning zeroed permissions, the default behavior is to change the
// mode of 0755 (directory) or 0644 (files). This can be switched off
// with the Options.NullPermissions setting. If blksize is unset, 4096
// is assumed, and the 'blocks' field is set accordingly.
func (f *fileHandle) Getattr(ctx context.Context, out *fuse.AttrOut) (errno syscall.Errno) {
f.vfs.fillAttrOut(f.file, out)
return 0
}

var _ fusefs.FileGetattrer = (*fileHandle)(nil)

// Setattr sets attributes for an Inode.
func (f *fileHandle) Setattr(ctx context.Context, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) {
var err error
f.vfs.fillAttrOut(f.file, out)
size, ok := in.GetSize()
if ok {
err = f.file.Truncate(size)
if err != nil {
return translateError(err)
}
out.Attr.Size = size
}

mtime, ok := in.GetMTime()
if ok {
err = f.file.SetModTime(mtime)
if err != nil {
return translateError(err)
}
out.Attr.Mtime = uint64(mtime.Unix())
out.Attr.Mtimensec = uint32(mtime.Nanosecond())
}
return 0
}

var _ fusefs.FileSetattrer = (*fileHandle)(nil)

client2/internal/mount/file_node.go → client2/internal/mount/fuse/file_node.go View File

@@ -1,4 +1,6 @@
package mount
//go:build linux || (darwin && amd64)

package fuse

import (
"context"
@@ -13,20 +15,20 @@ type FileNode struct {
file FsFile
}

func newFileNode(vfs *Vfs, file FsFile) *FileNode {
func newFileNode(fs *Fuse, file FsFile) *FileNode {
return &FileNode{
NodeBase: NodeBase{vfs: vfs},
NodeBase: NodeBase{fs: fs},
file: file,
}
}

func (n *FileNode) Getattr(ctx context.Context, f fusefs.FileHandle, out *fuse.AttrOut) syscall.Errno {
n.vfs.fillAttrOut(n.file, out)
n.fs.fillAttrOut(n.file, out)
return 0
}

func (n *FileNode) Setattr(ctx context.Context, f fusefs.FileHandle, in *fuse.SetAttrIn, out *fuse.AttrOut) (errno syscall.Errno) {
n.vfs.fillAttrOut(n.file, out)
n.fs.fillAttrOut(n.file, out)

size, ok := in.GetSize()
if ok {
@@ -67,7 +69,7 @@ func (n *FileNode) Open(ctx context.Context, flags uint32) (fh fusefs.FileHandle
return nil, 0, translateError(err)
}

return hd, 0, 0
return newFileHandle(hd), 0, 0
}

var _ = (fusefs.NodeOpener)((*FileNode)(nil))

client2/internal/mount/vfs.go → client2/internal/mount/fuse/fuse.go View File

@@ -1,22 +1,28 @@
package mount
//go:build linux || (darwin && amd64)

package fuse

import (
"os"
"syscall"
"time"

fusefs "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
)

type Config struct {
GID uint32 `json:"gid"`
UID uint32 `json:"uid"`
AttrTimeout time.Duration `json:"attrTimeout"`
type Fuse struct {
fs Fs
rootDir FsDir
config *config.Config
}

func NewFuse(fs Fs, rootDir FsDir, config *config.Config) *Fuse {
return &Fuse{fs: fs, rootDir: rootDir, config: config}
}

type Vfs struct {
fs Fs
config Config
func (v *Fuse) Root() fusefs.InodeEmbedder {
return newDirNode(v, v.rootDir)
}

func translateError(err error) syscall.Errno {
@@ -41,8 +47,8 @@ func translateError(err error) syscall.Errno {
}
}

// get the Mode from a vfs Node
func (v *Vfs) getMode(node FsEntry) uint32 {
// get the Mode from a fs Node
func (v *Fuse) getMode(node FsEntry) uint32 {
Mode := node.Mode().Perm()
if node.IsDir() {
Mode |= fuse.S_IFDIR
@@ -53,7 +59,7 @@ func (v *Vfs) getMode(node FsEntry) uint32 {
}

// fill in attr from node
func (v *Vfs) fillAttr(node FsEntry, attr *fuse.Attr) {
func (v *Fuse) fillAttr(node FsEntry, attr *fuse.Attr) {
Size := uint64(node.Size())
const BlockSize = 512
Blocks := (Size + BlockSize - 1) / BlockSize
@@ -65,7 +71,6 @@ func (v *Vfs) fillAttr(node FsEntry, attr *fuse.Attr) {
attr.Blocks = Blocks
// attr.Blksize = BlockSize // not supported in freebsd/darwin, defaults to 4k if not set

createTime := node.CreateTime()
modTime := node.ModTime()

attr.Atime = uint64(modTime.Unix())
@@ -74,16 +79,16 @@ func (v *Vfs) fillAttr(node FsEntry, attr *fuse.Attr) {
attr.Mtime = uint64(modTime.Unix())
attr.Mtimensec = uint32(modTime.Nanosecond())

attr.Ctime = uint64(createTime.Unix())
attr.Ctimensec = uint32(createTime.Nanosecond())
attr.Ctime = uint64(modTime.Unix())
attr.Ctimensec = uint32(modTime.Nanosecond())
}

func (v *Vfs) fillAttrOut(node FsEntry, out *fuse.AttrOut) {
func (v *Fuse) fillAttrOut(node FsEntry, out *fuse.AttrOut) {
v.fillAttr(node, &out.Attr)
out.SetTimeout(v.config.AttrTimeout)
}

func (v *Vfs) fillEntryOut(node FsEntry, out *fuse.EntryOut) {
func (v *Fuse) fillEntryOut(node FsEntry, out *fuse.EntryOut) {
v.fillAttr(node, &out.Attr)
out.SetAttrTimeout(v.config.AttrTimeout)
out.SetEntryTimeout(v.config.AttrTimeout)

client2/internal/mount/node.go → client2/internal/mount/fuse/node.go View File

@@ -1,4 +1,6 @@
package mount
//go:build linux || (darwin && amd64)

package fuse

import (
"context"
@@ -10,7 +12,7 @@ import (

type NodeBase struct {
fusefs.Inode
vfs *Vfs
fs *Fuse
}

// Statfs implements statistics for the filesystem that holds this
@@ -20,9 +22,9 @@ type NodeBase struct {
func (n *NodeBase) Statfs(ctx context.Context, out *fuse.StatfsOut) syscall.Errno {
const blockSize = 4096

stats := n.vfs.fs.Stats()
stats := n.fs.fs.Stats()

// total, _, free := n.fsys.VFS.Statfs()
// total, _, free := n.fsys.fs.Statfs()
out.Blocks = uint64(stats.TotalDataBytes) / blockSize // Total data blocks in file system.
out.Bfree = 1e9 // Free blocks in file system.
out.Bavail = out.Bfree // Free blocks in file system if you're not root.

client2/internal/mount/types.go → client2/internal/mount/fuse/types.go View File

@@ -1,4 +1,4 @@
package mount
package fuse

import (
"context"
@@ -28,7 +28,6 @@ type FsEntry interface {
Size() int64
Mode() os.FileMode
ModTime() time.Time
CreateTime() time.Time
IsDir() bool
}

@@ -41,7 +40,7 @@ type FsDir interface {
Children(ctx context.Context) ([]FsEntry, error)
ReadChildren() (DirReader, error)
NewDir(ctx context.Context, name string) (FsDir, error)
NewFile(ctx context.Context, name string, flags uint32) (FsFile, error)
NewFile(ctx context.Context, name string, flags uint32) (FileHandle, error)
RemoveChild(ctx context.Context, name string) error
MoveChild(ctx context.Context, oldName string, newName string, newParent FsDir) error
}
@@ -61,6 +60,7 @@ type DirReader interface {
}

type FileHandle interface {
Entry() FsFile
ReadAt(buf []byte, off int64) (int, error)
WriteAt(buf []byte, off int64) (int, error)
Sync() error

+ 21
- 0
client2/internal/mount/mount.go View File

@@ -0,0 +1,21 @@
package mount

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type Mount struct{}

func (m *Mount) Start() {

}

func (m *Mount) NotifyObjectInvalid(obj cdssdk.Object) {

}

func (m *Mount) NotifyPackageInvalid(pkg cdssdk.Package) {

}

func (m *Mount) NotifyBucketInvalid(bkt cdssdk.Bucket) {

}

+ 107
- 0
client2/internal/mount/vfs/cache/cache.go View File

@@ -0,0 +1,107 @@
package cache

import (
"os"
"path/filepath"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
)

type CacheEntry interface {
fuse.FsEntry
// 在虚拟文件系统中的路径,即不包含缓存目录的路径
PathComps() []string
// 不再使用本缓存条目
Release()
}

type Cache struct {
cacheDataDir string
cacheMetaDir string
}

func (c *Cache) GetCacheDataPath(comps ...string) string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheDataDir
copy(comps2[1:], comps)
return filepath.Join(comps2...)
}

func (c *Cache) GetCacheDataPathComps(comps ...string) []string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheDataDir
copy(comps2[1:], comps)
return comps2
}

func (c *Cache) GetCacheMetaPath(comps ...string) string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheMetaDir
copy(comps2[1:], comps)
return filepath.Join(comps2...)
}

func (c *Cache) GetCacheMetaPathComps(comps ...string) []string {
comps2 := make([]string, len(comps)+1)
comps2[0] = c.cacheMetaDir
copy(comps2[1:], comps)
return comps2
}

// 加载指定路径的缓存文件或者目录,如果路径不存在,则返回nil。
func (c *Cache) LoadAny(pathComps []string) CacheEntry {
pat := c.GetCacheMetaPath(pathComps...)
info, err := os.Stat(pat)
if err != nil {
// TODO 日志记录
return nil
}

if info.IsDir() {
return newCacheDir(pathComps, info)
}

file, err := loadCacheFile(pathComps, pat)
if err != nil {
// TODO 日志记录
return nil
}
return file
}

// 加载指定缓存文件,如果文件不存在,则根据create参数决定是否创建文件。
//
// 如果create为false,且文件不存在,则返回nil。如果目标位置是一个目录,则也会返回nil。
func (c *Cache) LoadFile(pathComps []string, create bool) *CacheFile {

}

// 加载指定缓存文件,如果文件不存在,则根据obj参数创建缓存文件。
func (c *Cache) LoadOrCreateFile(pathComps []string, obj cdssdk.Object) *CacheFile {

}

// 加载指定缓存目录,如果目录不存在,则根据create参数决定是否创建目录。
//
// 如果create为false,且目录不存在,则返回nil。如果目标位置是一个文件,则也会返回nil。
func (c *Cache) LoadDir(pathComps []string, create bool) *CacheDir {

}

// 加载指定路径下所有的缓存文件或者目录,如果路径不存在,则返回nil。
func (c *Cache) LoadMany(pathComps []string) []CacheEntry {

}

// 删除指定路径的缓存文件或目录。删除目录时如果目录不为空,则会报错。
func (c *Cache) Remove(pathComps []string) error {

}

// 移动指定路径的缓存文件或目录到新的路径。如果目标路径已经存在,则会报错。
//
// 如果移动成功,则返回移动后的缓存文件或目录。如果文件或目录不存在,则返回nil。
func (c *Cache) Move(pathComps []string, newPathComps []string) (CacheEntry, error) {

}

+ 50
- 0
client2/internal/mount/vfs/cache/dir.go View File

@@ -0,0 +1,50 @@
package cache

import (
"os"
"time"
)

type CacheDir struct {
pathComps []string
name string
modTime time.Time
perm os.FileMode
}

func newCacheDir(pathComps []string, info os.FileInfo) *CacheDir {
return &CacheDir{
pathComps: pathComps,
name: info.Name(),
modTime: info.ModTime(),
perm: info.Mode().Perm(),
}
}

func (f *CacheDir) PathComps() []string {
return f.pathComps
}

func (f *CacheDir) Name() string {
return f.name
}

func (f *CacheDir) Size() int64 {
return 0
}

func (f *CacheDir) Mode() os.FileMode {
return f.perm | os.ModeDir
}

func (f *CacheDir) ModTime() time.Time {
return f.modTime
}

func (f *CacheDir) IsDir() bool {
return true
}

func (f *CacheDir) Release() {

}

+ 135
- 0
client2/internal/mount/vfs/cache/file.go View File

@@ -0,0 +1,135 @@
package cache

import (
"os"
"path/filepath"
"sync"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type FileInfo struct {
Dirty bool // 本文件是否有未提交的修改
Segments []Range // 数据段列表,按照段开始位置从小到大排列
ObjectID cdssdk.ObjectID // 文件对应的对象ID,仅在文件是一个缓存文件时才有值
Hash string // 如果本文件完全是一个缓存文件,那么这个字段记录了其内容的哈希值,用于在下载缓存数据时,检查远端文件是否被修改过
Size int64 // 文件大小。如果是缓存文件,那么这个字段记录了其内容的总大小,用于判断文件是否完整。如果是本地文件,那么这个字段记录了其实际大小。
ModTime time.Time // 文件的最后修改时间
Perm os.FileMode // 文件的权限
}

type Range struct {
Position int64
Length int64
}

// 所有读写过程共用同一个CacheFile对象。
// 不应该将此结构体保存到对象中
type CacheFile struct {
pathComps []string
name string
info FileInfo
lock sync.RWMutex
segBuffer SegmentBuffer
readers int
writers int
}

func loadCacheFile(pathComps []string, infoPath string) (*CacheFile, error) {
f, err := os.Open(infoPath)
if err != nil {
return nil, err
}
defer f.Close()

info := &FileInfo{}
err = serder.JSONToObjectStream(f, info)
if err != nil {
return nil, err
}

return &CacheFile{
pathComps: pathComps,
name: filepath.Base(infoPath),
info: *info,
}, nil
}

func (f *CacheFile) PathComps() []string {
return f.pathComps
}

func (f *CacheFile) Name() string {
return f.name
}

func (f *CacheFile) Size() int64 {
return f.info.Size
}

func (f *CacheFile) Mode() os.FileMode {
return f.info.Perm
}

func (f *CacheFile) ModTime() time.Time {
return f.info.ModTime
}

func (f *CacheFile) IsDir() bool {
return false
}

// 打开一个写入句柄,同时支持读取
func (f *CacheFile) Open(readOnly bool) *CacheFileReadWriter {
f.lock.Lock()
defer f.lock.Unlock()

if readOnly {
f.readers++
} else {
f.writers++
}

return &CacheFileReadWriter{
file: f,
readOnly: readOnly,
}
}

func (f *CacheFile) SetModTime(modTime time.Time) error {
}

func (f *CacheFile) Truncate(size int64) error {
}

// 不再使用缓存文件
func (f *CacheFile) Release() {

}

// 一个文件段的数据被写入到本地了
func (f *CacheFile) OnSaved(seg *FileSegment) {

}

type CacheFileReadWriter struct {
file *CacheFile
readOnly bool
}

func (f *CacheFileReadWriter) ReadAt(buf []byte, off int64) (int, error) {

}

func (f *CacheFileReadWriter) WriteAt(buf []byte, off int64) (int, error) {

}

func (f *CacheFileReadWriter) Sync() error {
}

func (f *CacheFileReadWriter) Close() error {

}

+ 12
- 0
client2/internal/mount/vfs/cache/file_saver.go View File

@@ -0,0 +1,12 @@
package cache

import "os"

type FileSaver struct {
owner *CacheFile
fileHd *os.File
}

func (s *FileSaver) BeginSaving(seg *FileSegment) {

}

+ 130
- 0
client2/internal/mount/vfs/cache/file_segment.go View File

@@ -0,0 +1,130 @@
package cache

import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/common/utils/sync2"
)

type SegmentType int

const (
// 数据来自本地文件
SegmentLocal = iota
// 数据来自远端文件,还未写入到本地文件
SegmentRemote
// 数据由用户写入,还未写入到本地文件
SegmentDirty
)

type FileSegment struct {
Position int64
// 有效数据的长度。不一定等于Buffer的长度
Length int64
Type SegmentType
// 文件数据缓冲区。不可对此缓冲区进行append操作!
Buffer []byte
// 当前段是否正在被保存到本地文件中
IsSaving bool
// 引用计数。当引用计数为0时,可以安全地删除此段
RefBuzyCount int
}

func (s *FileSegment) SubSliceAbs(pos int64) []byte {
return s.Buffer[pos-s.Position:]
}

// 将当前段拆分为两个段。当前段将持有第一个段,返回值持有第二个段
func (s *FileSegment) SplitAbs(pos int64) *FileSegment {
s2 := s.SubSliceAbs(pos)
s2Len := math2.Max(s.Position+s.Length-pos, 0)

s.Buffer = s.Buffer[:pos-s.Position]
s.Length = math2.Min(int64(len(s.Buffer)), s.Length)

return &FileSegment{
Position: pos,
Length: s2Len,
Type: s.Type,
Buffer: s2,
}
}

type SegmentBuffer struct {
// 正在使用的文件段缓冲区
Buzys []*FileSegment
// 完全空闲的文件段缓冲区,可以作为新段使用
frees *sync2.BucketPool[*FileSegment]
}

func (s *SegmentBuffer) BuzyCount() int {
return len(s.Buzys)
}

// 申请一个空闲段,如果暂时没有空闲段,则会阻塞等待
func (s *SegmentBuffer) AcquireFree() *FileSegment {
s.frees.GetEmpty()
}

// 查找第一个包含指定位置的段索引。如果所有段都不包含指定位置,那么有以下三种情况:
//
// 1. pos小于第一个段的位置,返回-1
//
// 2. pos大于等于最后一个段的结束位置,返回BuzyCount() - 1
//
// 3. pos在段之间的空洞内,那么会返回小于pos的最后一个段
//
// 注:2、3情况返回的结果是相同的
func (s *SegmentBuffer) FirstContains(pos int64) int {
low, high := 0, len(s.Buzys)-1

for low <= high {
mid := (low + high) / 2

if s.Buzys[mid].Position > pos {
high = mid - 1
} else if s.Buzys[mid].Position < pos {
if mid == s.BuzyCount()-1 {
return mid
}

low = mid

} else {
return mid
}
}

return high
}

// 将指定段插入到段缓存的恰当位置
func (s *SegmentBuffer) Insert(seg *FileSegment) {
index := s.FirstContains(seg.Position)

if index == -1 {
s.Buzys = append([]*FileSegment{seg}, s.Buzys...)
} else {
// index是最后一个小于Position的位置,所以要加1
s.Buzys = lo2.Insert(s.Buzys, index+1, seg)
}
}

// 插入一个段到指定索引位置。不会检查插入后Segments是否依然保持有序。
func (s *SegmentBuffer) InsertAt(index int, seg *FileSegment) {
s.Buzys = lo2.Insert(s.Buzys, index, seg)
}

// 将指定位置的段从Buzys中移除,并放入Frees中
func (s *SegmentBuffer) FreeAt(index int) {
buf := s.Buzys[index]
s.Buzys = append(s.Buzys[:index], s.Buzys[index+1:]...)
s.Frees = append(s.Frees, buf)
}

// 将指定段从Buzys中移除,并放入Frees中
func (s *SegmentBuffer) Free(seg *FileSegment) {
index := lo.IndexOf(s.Buzys, seg)
s.FreeAt(index)
}

+ 36
- 0
client2/internal/mount/vfs/cache/file_segment_test.go View File

@@ -0,0 +1,36 @@
package cache

import (
"fmt"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func Test_FindSegmentIndex(t *testing.T) {
cases := []struct {
title string
buffer SegmentBuffer
postions []int64
wants []int
}{
{
buffer: SegmentBuffer{
Buzys: []*FileSegment{
{Position: 0}, {Position: 10},
},
},
postions: []int64{0, 2, 10, 11},
wants: []int{0, 0, 1, 1},
},
}

for i, c := range cases {
for j, _ := range c.postions {
Convey(fmt.Sprintf("%d.%d. %s", i, j, c.title), t, func() {
got := c.buffer.FirstContains(c.postions[j])
So(got, ShouldEqual, c.wants[j])
})
}
}
}

+ 38
- 0
client2/internal/mount/vfs/dir_reader.go View File

@@ -0,0 +1,38 @@
package vfs

import "gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"

type FuseDirReader struct {
allEntries []fuse.FsEntry
nextReadIndex int
}

func newFuseDirReader(entries []fuse.FsEntry) *FuseDirReader {
return &FuseDirReader{
allEntries: entries,
nextReadIndex: 0,
}
}

func (r *FuseDirReader) HasNext() bool {
return r.nextReadIndex < len(r.allEntries)
}

func (r *FuseDirReader) Next(n int) ([]fuse.FsEntry, error) {
if r.nextReadIndex >= len(r.allEntries) {
return nil, nil
}
if n > len(r.allEntries)-r.nextReadIndex {
n = len(r.allEntries) - r.nextReadIndex
}
entries := make([]fuse.FsEntry, n)
for i := 0; i < n; i++ {
entries[i] = r.allEntries[r.nextReadIndex+i]
}
r.nextReadIndex += n
return entries, nil
}

func (r *FuseDirReader) Close() {

}

+ 5
- 0
client2/internal/mount/vfs/fuse.go View File

@@ -0,0 +1,5 @@
package vfs

type FuseNode interface {
PathComps() []string
}

+ 22
- 0
client2/internal/mount/vfs/fuse_bucket.go View File

@@ -0,0 +1,22 @@
package vfs

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
)

type FuseBucket struct {
}

func newBucketOrFileFromCache(c cache.CacheEntry, vfs *Vfs) fuse.FsEntry {

}

func newBucketFromCache(c *cache.CacheDir, vfs *Vfs) fuse.FsDir {

}

func newBucketFromDB(bkt cdssdk.Bucket, vfs *Vfs) fuse.FsEntry {

}

+ 149
- 0
client2/internal/mount/vfs/fuse_file.go View File

@@ -0,0 +1,149 @@
package vfs

import (
"os"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
)

type FuseFileNode struct {
vfs *Vfs
pathComps []string
size int64
modTime time.Time
mode os.FileMode
}

func newFile(vfs *Vfs, cache *cache.CacheFile) *FuseFileNode {
return &FuseFileNode{
vfs: vfs,
pathComps: cache.PathComps(),
size: cache.Size(),
modTime: cache.ModTime(),
mode: cache.Mode(),
}
}

func (n *FuseFileNode) Name() string {
return n.pathComps[len(n.pathComps)-1]
}

func (n *FuseFileNode) Size() int64 {
return n.size
}

func (n *FuseFileNode) Mode() os.FileMode {
return n.mode
}

func (n *FuseFileNode) ModTime() time.Time {
return n.modTime
}

func (n *FuseFileNode) IsDir() bool {
return false
}

func (n *FuseFileNode) Truncate(size uint64) error {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
return fuse.ErrNotExists
}

return cacheFile.Truncate(int64(size))
}

func (n *FuseFileNode) SetModTime(time time.Time) error {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
return fuse.ErrNotExists
}
defer cacheFile.Release()

return cacheFile.SetModTime(time)
}

func (n *FuseFileNode) Open(flags uint32) (fuse.FileHandle, error) {
cacheFile := n.loadCacheFile()
if cacheFile == nil {
// 如果文件不存在,也不进行创建,因为创建不应该调用这个接口
return nil, fuse.ErrNotExists
}
defer cacheFile.Release()

if flags&uint32(os.O_WRONLY) != 0 {
hd := cacheFile.Open(false)
return newFileHandle(n, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd := cacheFile.Open(true)
return newFileHandle(n, hd), nil
}

return nil, fuse.ErrPermission
}

func (n *FuseFileNode) loadCacheFile() *cache.CacheFile {
cdsObj := n.loadCDSObject()

if cdsObj != nil {
return n.vfs.cache.LoadOrCreateFile(n.pathComps, *cdsObj)
}

return n.vfs.cache.LoadFile(n.pathComps, false)
}

func (n *FuseFileNode) loadCDSObject() *cdssdk.Object {
if len(n.pathComps) >= 3 {
pkg, err := n.vfs.db.Package().GetUserPackageByName(n.vfs.db.DefCtx(), 1, n.pathComps[0], n.pathComps[1])
if err == nil {
obj, err := n.vfs.db.Object().GetByPath(n.vfs.db.DefCtx(), pkg.PackageID, cdssdk.JoinObjectPath(n.pathComps[2:]...))
if err == nil {
return &obj
}
}
}

return nil
}

type FuseFileHandle struct {
entry *FuseFileNode
readWriter *cache.CacheFileReadWriter
}

func newFileHandle(entry *FuseFileNode, readWriter *cache.CacheFileReadWriter) *FuseFileHandle {
return &FuseFileHandle{
entry: entry,
readWriter: readWriter,
}
}

func (hd *FuseFileHandle) Entry() fuse.FsFile {
return hd.entry
}

func (hd *FuseFileHandle) ReadAt(buf []byte, off int64) (int, error) {
return hd.readWriter.ReadAt(buf, off)
}

func (hd *FuseFileHandle) WriteAt(buf []byte, off int64) (int, error) {
return hd.readWriter.WriteAt(buf, off)
}

func (hd *FuseFileHandle) Sync() error {
return hd.readWriter.Sync()
}

func (hd *FuseFileHandle) Close() error {
return hd.readWriter.Close()
}

func (hd *FuseFileHandle) Release() error {
// TODO 其他清理工作
return nil
}

+ 233
- 0
client2/internal/mount/vfs/fuse_root.go View File

@@ -0,0 +1,233 @@
package vfs

import (
"context"
"os"
"time"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/fuse"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
"gorm.io/gorm"
)

type Root struct {
vfs *Vfs
}

func (r *Root) PathComps() []string {
return []string{}
}

func (r *Root) Name() string {
return ""
}

func (r *Root) Size() int64 {
return 0
}

func (r *Root) Mode() os.FileMode {
return os.ModeDir | 0755
}

func (r *Root) ModTime() time.Time {
return time.Now()
}

func (r *Root) CreateTime() time.Time {
return time.Now()
}

func (r *Root) IsDir() bool {
return true
}

func (r *Root) SetModTime(time time.Time) error {
return nil
}

// 如果不存在,应该返回ErrNotExists
func (r *Root) Child(ctx context.Context, name string) (fuse.FsEntry, error) {
cache := r.vfs.cache.LoadAny([]string{name})
if cache != nil {
defer cache.Release()

return newBucketOrFileFromCache(cache, r.vfs), nil
}

bkt, err := r.vfs.db.Bucket().GetByName(r.vfs.db.DefCtx(), name)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil, fuse.ErrNotExists
}

return nil, err
}

return newBucketFromDB(bkt, r.vfs), nil
}

func (r *Root) Children(ctx context.Context) ([]fuse.FsEntry, error) {
var ens []fuse.FsEntry

exists := make(map[string]bool)

caches := r.vfs.cache.LoadMany([]string{})
for _, c := range caches {
exists[c.Name()] = true
ens = append(ens, newBucketOrFileFromCache(c, r.vfs))
c.Release()
}

remotes, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx())
if err != nil {
return nil, err
}
for _, remote := range remotes {
if !exists[remote.Name] {
ens = append(ens, newBucketFromDB(remote, r.vfs))
}
}

return ens, nil
}

func (r *Root) ReadChildren() (fuse.DirReader, error) {
caches := r.vfs.cache.LoadMany([]string{})
bkts, err := r.vfs.db.Bucket().GetAll(r.vfs.db.DefCtx())
if err != nil {
return nil, err
}

exists := make(map[string]fuse.FsEntry)
for _, c := range caches {
exists[c.Name()] = newBucketOrFileFromCache(c, r.vfs)
c.Release()
}

for _, bkt := range bkts {
if _, ok := exists[bkt.Name]; !ok {
exists[bkt.Name] = newBucketFromDB(bkt, r.vfs)
}
}

return newFuseDirReader(lo.Values(exists)), nil
}

func (r *Root) NewDir(ctx context.Context, name string) (fuse.FsDir, error) {
cache := r.vfs.cache.LoadDir([]string{name}, true)
if cache == nil {
return nil, fuse.ErrPermission
}
defer cache.Release()

// TODO 用户ID,失败了可以打个日志
// TODO 生成系统事件
// 不关注创建是否成功,仅尝试一下
r.vfs.db.Bucket().Create(r.vfs.db.DefCtx(), 1, name)

return newBucketFromCache(cache, r.vfs), nil
}

func (r *Root) NewFile(ctx context.Context, name string, flags uint32) (fuse.FileHandle, error) {
cache := r.vfs.cache.LoadFile([]string{name}, true)
if cache == nil {
return nil, fuse.ErrPermission
}
defer cache.Release()
// Open之后会给cache的引用计数额外+1,即使cache先于FileHandle被关闭,
// 也有有FileHandle的计数保持cache的有效性

fileNode := newFile(r.vfs, cache)

if flags&uint32(os.O_WRONLY) != 0 {
hd, err := cache.Open(false)
if err != nil {
return nil, err
}

return newFileHandle(fileNode, hd), nil
}

if flags&uint32(os.O_RDONLY) != 0 {
hd, err := cache.Open(true)
if err != nil {
return nil, err
}

return newFileHandle(fileNode, hd), nil
}

return nil, fuse.ErrPermission
}

func (r *Root) RemoveChild(ctx context.Context, name string) error {
err := r.vfs.cache.Remove([]string{name})
if err != nil {
return err
}

// TODO 生成系统事件
// 不关心是否成功
r.vfs.db.DoTx(func(tx db2.SQLContext) error {
d := r.vfs.db

bkt, err := d.Bucket().GetByName(tx, name)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil
}
}

return d.Bucket().DeleteComplete(tx, bkt.BucketID)
})

return nil
}

func (r *Root) MoveChild(ctx context.Context, oldName string, newName string, newParent fuse.FsDir) error {
newParentNode := newParent.(FuseNode)
_, err := r.vfs.cache.Move([]string{oldName}, append(newParentNode.PathComps(), newName))
if err != nil {
return err
}

d := r.vfs.db
// 如果目标节点是根节点,那么就是重命名桶
if _, ok := newParent.(*Root); ok {
d.DoTx(func(tx db2.SQLContext) error {
_, err := d.Bucket().GetByName(tx, newName)
if err == nil {
// 目标节点已经存在,不能重命名,直接退出
return err
}

oldBkt, err := d.Bucket().GetByName(tx, oldName)
if err != nil {
// 源节点不存在,直接退出
return err
}

// 不关注重命名是否成功,仅尝试一下
return d.Bucket().Rename(tx, oldBkt.BucketID, newName)
})
} else {
// TODO 做法存疑
// 其他情况则删除旧桶
d.DoTx(func(tx db2.SQLContext) error {
oldBkt, err := d.Bucket().GetByName(tx, oldName)
if err != nil {
// 源节点不存在,直接退出
return err
}

return d.Bucket().DeleteComplete(tx, oldBkt.BucketID)
})
}

return nil
}

var _ fuse.FsDir = (*Root)(nil)
var _ FuseNode = (*Root)(nil)

+ 17
- 0
client2/internal/mount/vfs/vfs.go View File

@@ -0,0 +1,17 @@
package vfs

import (
"gitlink.org.cn/cloudream/storage/client2/internal/mount/config"
"gitlink.org.cn/cloudream/storage/client2/internal/mount/vfs/cache"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"
)

type Vfs struct {
db *db2.DB
config *config.Config
cache *cache.Cache
}

func (v *Vfs) Root() {

}

+ 31
- 0
common/pkgs/db2/bucket.go View File

@@ -24,6 +24,12 @@ func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bu
return ret, err
}

func (db *BucketDB) GetByName(ctx SQLContext, bucketName string) (cdssdk.Bucket, error) {
var ret cdssdk.Bucket
err := ctx.Table("Bucket").Where("Name = ?", bucketName).First(&ret).Error
return ret, err
}

// GetIDByName 根据BucketName查询BucketID
func (db *BucketDB) GetIDByName(ctx SQLContext, bucketName string) (int64, error) {
var result struct {
@@ -39,6 +45,12 @@ func (db *BucketDB) GetIDByName(ctx SQLContext, bucketName string) (int64, error
return result.BucketID, nil
}

func (*BucketDB) GetAll(ctx SQLContext) ([]cdssdk.Bucket, error) {
var ret []cdssdk.Bucket
err := ctx.Table("Bucket").Find(&ret).Error
return ret, err
}

// IsAvailable 判断用户是否有指定Bucekt的权限
func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID cdssdk.BucketID, userID cdssdk.UserID) (bool, error) {
_, err := db.GetUserBucket(ctx, userID, bucketID)
@@ -112,6 +124,25 @@ func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName stri
return newBucket.BucketID, nil
}

func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error {
return ctx.Table("Bucket").Where("BucketID = ?", bucketID).Update("Name", bucketName).Error
}

func (db *BucketDB) Delete(ctx SQLContext, bucketID cdssdk.BucketID) error {
return ctx.Delete(&cdssdk.Bucket{}, "BucketID = ?", bucketID).Error
}

func (db *BucketDB) DeleteComplete(tx SQLContext, bucketID cdssdk.BucketID) error {
pkgs, err := db.Package().GetBucketPackages(tx, bucketID)
if err != nil {
return err
}

for _, pkg := range pkgs {
err := db.Package().DeleteComplete(tx, pkg.PackageID)
if err != nil {
return err
}
}
return db.Bucket().Delete(tx, bucketID)
}

+ 3
- 3
common/pkgs/db2/object.go View File

@@ -26,9 +26,9 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (cdssdk.Ob
return ret, err
}

func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Find(&ret).Error
func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).First(&ret).Error
return ret, err
}



+ 2
- 1
coordinator/internal/mq/object.go View File

@@ -71,10 +71,11 @@ func (svc *Service) GetObjectsByPath(msg *coormq.GetObjectsByPath) (*coormq.GetO
return fmt.Errorf("getting objects with prefix: %w", err)
}
} else {
objs, err = svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
obj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
if err != nil {
return fmt.Errorf("getting object by path: %w", err)
}
objs = append(objs, obj)
}

return nil


Loading…
Cancel
Save