Browse Source

统一管理storage相关的所有服务

gitlink
Sydonian 1 year ago
parent
commit
65793ddde7
24 changed files with 662 additions and 343 deletions
  1. +16
    -0
      agent/internal/cmd/cmd.go
  2. +245
    -0
      agent/internal/cmd/serve.go
  3. +8
    -3
      agent/internal/config/config.go
  4. +7
    -7
      agent/internal/mq/cache.go
  5. +6
    -6
      agent/internal/mq/service.go
  6. +3
    -3
      agent/internal/task/cache_move_package.go
  7. +3
    -3
      agent/internal/task/storage_load_package.go
  8. +12
    -12
      agent/internal/task/task.go
  9. +2
    -210
      agent/main.go
  10. +6
    -0
      common/pkgs/db2/storage.go
  11. +11
    -11
      common/pkgs/ioswitch2/ops2/shard_store.go
  12. +11
    -11
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  13. +3
    -2
      common/pkgs/mq/agent/server.go
  14. +30
    -0
      common/pkgs/mq/coordinator/node.go
  15. +10
    -0
      common/pkgs/storage/mgr/create_components.go
  16. +27
    -0
      common/pkgs/storage/mgr/create_shardstore.go
  17. +10
    -0
      common/pkgs/storage/mgr/create_sharedstore.go
  18. +169
    -0
      common/pkgs/storage/mgr/mgr.go
  19. +0
    -68
      common/pkgs/storage/shard/pool/pool.go
  20. +6
    -4
      common/pkgs/storage/shard/storages/local/local.go
  21. +2
    -3
      common/pkgs/storage/shard/types/shardstore.go
  22. +7
    -0
      common/pkgs/storage/shared/shared.go
  23. +12
    -0
      common/pkgs/storage/types/types.go
  24. +56
    -0
      coordinator/internal/mq/node.go

+ 16
- 0
agent/internal/cmd/cmd.go View File

@@ -0,0 +1,16 @@
package cmd

import "github.com/spf13/cobra"

var RootCmd = &cobra.Command{
Use: "agent",
}

func init() {
var configPath string
RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "path to config file")

RootCmd.Run = func(cmd *cobra.Command, args []string) {
serve(configPath)
}
}

+ 245
- 0
agent/internal/cmd/serve.go View File

@@ -0,0 +1,245 @@
package cmd

import (
"fmt"
"net"
"os"
"time"

"gitlink.org.cn/cloudream/storage/agent/internal/http"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"

"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq"
)

func serve(configPath string) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

stgglb.InitLocal(&config.Cfg().Local)
stgglb.InitMQPool(&config.Cfg().RabbitMQ)
stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})

hubCfg := downloadHubConfig()

stgMgr := mgr.NewManager()
for _, stg := range hubCfg.Storages {
err := stgMgr.InitStorage(stg)
if err != nil {
fmt.Printf("init storage %v: %v", stg, err)
os.Exit(1)
}
}

sw := exec.NewWorker()

httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&sw))
if err != nil {
logger.Fatalf("new http server failed, err: %s", err.Error())
}
go serveHTTP(httpSvr)

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) {
log := logger.WithField("Connectivity", "")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("acquire coordinator mq failed, err: %s", err.Error())
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

cons := collector.GetAll()
nodeCons := make([]cdssdk.NodeConnectivity, 0, len(cons))
for _, con := range cons {
var delay *float32
if con.Delay != nil {
v := float32(con.Delay.Microseconds()) / 1000
delay = &v
}

nodeCons = append(nodeCons, cdssdk.NodeConnectivity{
FromNodeID: *stgglb.Local.NodeID,
ToNodeID: con.ToNodeID,
Delay: delay,
TestTime: con.TestTime,
})
}

_, err = coorCli.UpdateNodeConnectivities(coormq.ReqUpdateNodeConnectivities(nodeCons))
if err != nil {
log.Warnf("update node connectivities: %v", err)
}
})
conCol.CollectInPlace()

acStat := accessstat.NewAccessStat(accessstat.Config{
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
})
go serveAccessStat(acStat)

distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Fatalf("new ipfs failed, err: %s", err.Error())
}

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgMgr)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgMgr), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new agent server failed, err: %s", err.Error())
}
agtSvr.OnError(func(err error) {
logger.Warnf("agent server err: %s", err.Error())
})
go serveAgentServer(agtSvr)

//面向客户端收发数据
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw))
go serveGRPC(s, lis)

go serveDistLock(distlock)

foever := make(chan struct{})
<-foever
}

func downloadHubConfig() coormq.GetHubConfigResp {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Errorf("new coordinator client: %v", err)
os.Exit(1)
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cdssdk.NodeID(config.Cfg().ID)))
if err != nil {
logger.Errorf("getting hub config: %v", err)
os.Exit(1)
}

return *cfgResp
}

func serveAgentServer(server *agtmq.Server) {
logger.Info("start serving command server")

err := server.Serve()

if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
}

logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveGRPC(s *grpc.Server, lis net.Listener) {
logger.Info("start serving grpc")

err := s.Serve(lis)

if err != nil {
logger.Errorf("grpc stopped with error: %s", err.Error())
}

logger.Info("grpc stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveHTTP(server *http.Server) {
logger.Info("start serving http")

err := server.Serve()

if err != nil {
logger.Errorf("http stopped with error: %s", err.Error())
}

logger.Info("http stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}

logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveAccessStat(svc *accessstat.AccessStat) {
logger.Info("start serving access stat")

ch := svc.Start()
loop:
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("access stat stopped with error: %v", err)
break
}

switch val := val.(type) {
case error:
logger.Errorf("access stat stopped with error: %v", val)
break loop
}
}
logger.Info("access stat stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

+ 8
- 3
agent/internal/config/config.go View File

@@ -3,6 +3,7 @@ package config
import (
"gitlink.org.cn/cloudream/common/pkgs/distlock"
log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
c "gitlink.org.cn/cloudream/common/utils/config"
stgmodels "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
@@ -12,7 +13,7 @@ import (
)

type Config struct {
ID int64 `json:"id"`
ID cdssdk.NodeID `json:"id"`
ListenAddr string `json:"listenAddr"`
Local stgmodels.LocalMachineInfo `json:"local"`
GRPC *grpc.Config `json:"grpc"`
@@ -25,8 +26,12 @@ type Config struct {

var cfg Config

func Init() error {
return c.DefaultLoad("agent", &cfg)
func Init(path string) error {
if path == "" {
return c.DefaultLoad("agent", &cfg)
}

return c.Load(path, &cfg)
}

func Cfg() *Config {


+ 7
- 7
agent/internal/mq/cache.go View File

@@ -12,9 +12,9 @@ import (
)

func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) {
store := svc.shardStorePool.Get(msg.StorageID)
if store == nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("storage %v has no shard store", msg.StorageID))
store, err := svc.stgMgr.GetShardStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err))
}

infos, err := store.ListAll()
@@ -31,12 +31,12 @@ func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *m
}

func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) {
store := svc.shardStorePool.Get(msg.StorageID)
if store == nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("storage %v has no shard store", msg.StorageID))
store, err := svc.stgMgr.GetShardStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err))
}

err := store.Purge(msg.Avaiables)
err = store.Purge(msg.Avaiables)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err))
}


+ 6
- 6
agent/internal/mq/service.go View File

@@ -2,17 +2,17 @@ package mq

import (
"gitlink.org.cn/cloudream/storage/agent/internal/task"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

type Service struct {
taskManager *task.Manager
shardStorePool *pool.ShardStorePool
taskManager *task.Manager
stgMgr *mgr.Manager
}

func NewService(taskMgr *task.Manager, shardStorePool *pool.ShardStorePool) *Service {
func NewService(taskMgr *task.Manager, stgMgr *mgr.Manager) *Service {
return &Service{
taskManager: taskMgr,
shardStorePool: shardStorePool,
taskManager: taskMgr,
stgMgr: stgMgr,
}
}

+ 3
- 3
agent/internal/task/cache_move_package.go View File

@@ -40,9 +40,9 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

store := ctx.shardStorePool.Get(t.storageID)
if store == nil {
return fmt.Errorf("storage has no shard store")
store, err := ctx.stgMgr.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}

mutex, err := reqbuilder.NewBuilder().


+ 3
- 3
agent/internal/task/storage_load_package.go View File

@@ -94,9 +94,9 @@ func (t *StorageLoadPackage) do(task *task.Task[TaskContext], ctx TaskContext) e
return fmt.Errorf("getting package object details: %w", err)
}

shardstore := ctx.shardStorePool.Get(t.storageID)
if shardstore == nil {
return fmt.Errorf("shard store %v not found on this hub", t.storageID)
shardstore, err := ctx.stgMgr.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}

mutex, err := reqbuilder.NewBuilder().


+ 12
- 12
agent/internal/task/task.go View File

@@ -6,16 +6,16 @@ import (
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
)

// TaskContext 定义了任务执行的上下文环境,包含分布式锁服务、IO开关和网络连接状态收集器
type TaskContext struct {
distlock *distlock.Service
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
shardStorePool *pool.ShardStorePool
distlock *distlock.Service
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgMgr *mgr.Manager
}

// CompleteFn 类型定义了任务完成时需要执行的函数,用于设置任务的执行结果
@@ -33,12 +33,12 @@ type Task = task.Task[TaskContext]
// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, shardStorePool *pool.ShardStorePool) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgMgr *mgr.Manager) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
shardStorePool: shardStorePool,
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
stgMgr: stgMgr,
})
}

+ 2
- 210
agent/main.go View File

@@ -1,215 +1,7 @@
package main

import (
"fmt"
"net"
"os"
"time"

"gitlink.org.cn/cloudream/storage/agent/internal/http"

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/agent/internal/config"
"gitlink.org.cn/cloudream/storage/agent/internal/task"
stgglb "gitlink.org.cn/cloudream/storage/common/globals"
"gitlink.org.cn/cloudream/storage/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage/common/pkgs/distlock"
"gitlink.org.cn/cloudream/storage/common/pkgs/downloader"
agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"

"google.golang.org/grpc"

agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"

grpcsvc "gitlink.org.cn/cloudream/storage/agent/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage/agent/internal/mq"
)

// TODO 此数据是否在运行时会发生变化?
var AgentIpList []string
import "gitlink.org.cn/cloudream/storage/agent/internal/cmd"

func main() {
// TODO 放到配置里读取
AgentIpList = []string{"pcm01", "pcm1", "pcm2"}

err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

stgglb.InitLocal(&config.Cfg().Local)
stgglb.InitMQPool(&config.Cfg().RabbitMQ)
stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{})

sw := exec.NewWorker()

svc := http.NewService(&sw)
if err != nil {
logger.Fatalf("new http service failed, err: %s", err.Error())
}
server, err := http.NewServer(config.Cfg().ListenAddr, svc)
err = server.Serve()
if err != nil {
logger.Fatalf("http server stopped with error: %s", err.Error())
}

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) {
log := logger.WithField("Connectivity", "")

coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
log.Warnf("acquire coordinator mq failed, err: %s", err.Error())
return
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

cons := collector.GetAll()
nodeCons := make([]cdssdk.NodeConnectivity, 0, len(cons))
for _, con := range cons {
var delay *float32
if con.Delay != nil {
v := float32(con.Delay.Microseconds()) / 1000
delay = &v
}

nodeCons = append(nodeCons, cdssdk.NodeConnectivity{
FromNodeID: *stgglb.Local.NodeID,
ToNodeID: con.ToNodeID,
Delay: delay,
TestTime: con.TestTime,
})
}

_, err = coorCli.UpdateNodeConnectivities(coormq.ReqUpdateNodeConnectivities(nodeCons))
if err != nil {
log.Warnf("update node connectivities: %v", err)
}
})
conCol.CollectInPlace()

acStat := accessstat.NewAccessStat(accessstat.Config{
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
})
go serveAccessStat(acStat)

// TODO2 根据配置实例化Store并加入到Pool中
shardStorePool := pool.New()

distlock, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Fatalf("new ipfs failed, err: %s", err.Error())
}

dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol)

taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, shardStorePool)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, shardStorePool), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new agent server failed, err: %s", err.Error())
}
agtSvr.OnError(func(err error) {
logger.Warnf("agent server err: %s", err.Error())
})
go serveAgentServer(agtSvr)

//面向客户端收发数据
listenAddr := config.Cfg().GRPC.MakeListenAddress()
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw))
go serveGRPC(s, lis)

go serveDistLock(distlock)

foever := make(chan struct{})
<-foever
}

func serveAgentServer(server *agtmq.Server) {
logger.Info("start serving command server")

err := server.Serve()

if err != nil {
logger.Errorf("command server stopped with error: %s", err.Error())
}

logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveGRPC(s *grpc.Server, lis net.Listener) {
logger.Info("start serving grpc")

err := s.Serve(lis)

if err != nil {
logger.Errorf("grpc stopped with error: %s", err.Error())
}

logger.Info("grpc stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

err := svc.Serve()

if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}

logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func serveAccessStat(svc *accessstat.AccessStat) {
logger.Info("start serving access stat")

ch := svc.Start()
loop:
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("access stat stopped with error: %v", err)
break
}

switch val := val.(type) {
case error:
logger.Errorf("access stat stopped with error: %v", val)
break loop
}
}
logger.Info("access stat stopped")

// TODO 仅简单结束了程序
os.Exit(1)
cmd.RootCmd.Execute()
}

+ 6
- 0
common/pkgs/db2/storage.go View File

@@ -76,3 +76,9 @@ func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID,

return stg, err
}

func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.NodeID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "NodeID = ?", hubID).Error
return stgs, err
}

+ 11
- 11
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
)

@@ -40,14 +40,14 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

file, err := store.Open(o.Open)
@@ -82,14 +82,14 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)


+ 11
- 11
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/pool"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/mgr"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
)

@@ -40,14 +40,14 @@ func (o *ShardRead) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("reading from shard store")
defer logger.Debugf("reading from shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

file, err := store.Open(o.Open)
@@ -82,14 +82,14 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
Debugf("writting file to shard store")
defer logger.Debugf("write to shard store finished")

pool, err := exec.ValueByType[*pool.ShardStorePool](ctx)
stgMgr, err := exec.ValueByType[*mgr.Manager](ctx)
if err != nil {
return fmt.Errorf("getting shard store pool: %w", err)
return fmt.Errorf("getting storage manager: %w", err)
}

store := pool.Get(o.StorageID)
if store == nil {
return fmt.Errorf("shard store %v not found", o.StorageID)
store, err := stgMgr.GetShardStore(o.StorageID)
if err != nil {
return fmt.Errorf("getting shard store of storage %v: %w", o.StorageID, err)
}

input, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)


+ 3
- 2
common/pkgs/mq/agent/server.go View File

@@ -2,6 +2,7 @@ package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
mymq "gitlink.org.cn/cloudream/storage/common/pkgs/mq"
)

@@ -18,14 +19,14 @@ type Server struct {
rabbitSvr mq.RabbitMQServer
}

func NewServer(svc Service, id int64, cfg *mymq.Config) (*Server, error) {
func NewServer(svc Service, id cdssdk.NodeID, cfg *mymq.Config) (*Server, error) {
srv := &Server{
service: svc,
}

rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
mymq.MakeAgentQueueName(id),
mymq.MakeAgentQueueName(int64(id)),
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},


+ 30
- 0
common/pkgs/mq/coordinator/node.go View File

@@ -3,9 +3,12 @@ package coordinator
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type NodeService interface {
GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage)

GetUserNodes(msg *GetUserNodes) (*GetUserNodesResp, *mq.CodeMessage)

GetNodes(msg *GetNodes) (*GetNodesResp, *mq.CodeMessage)
@@ -15,6 +18,33 @@ type NodeService interface {
UpdateNodeConnectivities(msg *UpdateNodeConnectivities) (*UpdateNodeConnectivitiesResp, *mq.CodeMessage)
}

var _ = Register(Service.GetHubConfig)

type GetHubConfig struct {
mq.MessageBodyBase
HubID cdssdk.NodeID `json:"hubID"`
}
type GetHubConfigResp struct {
mq.MessageBodyBase
Hub cdssdk.Node `json:"hub"`
Storages []stgmod.StorageDetail `json:"storages"`
}

func ReqGetHubConfig(hubID cdssdk.NodeID) *GetHubConfig {
return &GetHubConfig{
HubID: hubID,
}
}
func RespGetHubConfig(hub cdssdk.Node, storages []stgmod.StorageDetail) *GetHubConfigResp {
return &GetHubConfigResp{
Hub: hub,
Storages: storages,
}
}
func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) {
return mq.Request(Service.GetHubConfig, client.rabbitCli, msg)
}

// 查询用户可用的节点
var _ = Register(Service.GetUserNodes)



+ 10
- 0
common/pkgs/storage/mgr/create_components.go View File

@@ -0,0 +1,10 @@
package mgr

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createComponents(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
return nil
}

+ 27
- 0
common/pkgs/storage/mgr/create_shardstore.go View File

@@ -0,0 +1,27 @@
package mgr

import (
"fmt"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/local"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createShardStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
switch confg := detail.Shard.Config.(type) {
case *cdssdk.LocalShardStorage:
store, err := local.New(detail.Storage, *confg)
if err != nil {
return fmt.Errorf("new local shard store: %v", err)
}

store.Start(ch)
stg.Shard = store
return nil

default:
return fmt.Errorf("unsupported shard store type: %T", confg)
}
}

+ 10
- 0
common/pkgs/storage/mgr/create_sharedstore.go View File

@@ -0,0 +1,10 @@
package mgr

import (
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

func createSharedStore(detail stgmod.StorageDetail, ch *types.StorageEventChan, stg *storage) error {
return nil
}

+ 169
- 0
common/pkgs/storage/mgr/mgr.go View File

@@ -0,0 +1,169 @@
package mgr

import (
"errors"
"reflect"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/reflect2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shared"
stypes "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

var ErrStorageNotFound = errors.New("storage not found")

var ErrComponentNotFound = errors.New("component not found")

var ErrStorageExists = errors.New("storage already exists")

type storage struct {
Shard types.ShardStore
Shared shared.SharedStore
Components []stypes.StorageComponent
}

type Manager struct {
storages map[cdssdk.StorageID]*storage
lock sync.Mutex
eventChan *stypes.StorageEventChan
}

func NewManager() *Manager {
return &Manager{
storages: make(map[cdssdk.StorageID]*storage),
eventChan: async.NewUnboundChannel[stypes.StorageEvent](),
}
}

func (m *Manager) InitStorage(detail stgmod.StorageDetail) error {
m.lock.Lock()
defer m.lock.Unlock()

if _, ok := m.storages[detail.Storage.StorageID]; ok {
return ErrStorageExists
}

stg := &storage{}

if detail.Shard != nil {
err := createShardStore(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}
}

if detail.Shared != nil {
err := createSharedStore(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}
}

// 创建其他组件
err := createComponents(detail, m.eventChan, stg)
if err != nil {
stopStorage(stg)
return err
}

m.storages[detail.Storage.StorageID] = stg
return nil
}

func stopStorage(stg *storage) {
if stg.Shard != nil {
stg.Shard.Stop()
}

if stg.Shared != nil {
stg.Shared.Stop()
}

for _, c := range stg.Components {
c.Stop()
}
}

// 查找指定Storage的ShardStore组件
func (m *Manager) GetShardStore(stgID cdssdk.StorageID) (types.ShardStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

if stg.Shard == nil {
return nil, ErrComponentNotFound
}

return stg.Shard, nil
}

// 查找指定Storage的SharedStore组件
func (m *Manager) GetSharedStore(stgID cdssdk.StorageID) (shared.SharedStore, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

if stg.Shared == nil {
return nil, ErrComponentNotFound
}

return stg.Shared, nil
}

// 查找指定Storage的指定类型的组件,可以是ShardStore、SharedStore、或者其他自定义的组件
func (m *Manager) GetComponent(stgID cdssdk.StorageID, typ reflect.Type) (stypes.StorageComponent, error) {
m.lock.Lock()
defer m.lock.Unlock()

stg := m.storages[stgID]
if stg == nil {
return nil, ErrStorageNotFound
}

switch typ {
case reflect2.TypeOf[types.ShardStore]():
if stg.Shard == nil {
return nil, ErrComponentNotFound
}

return stg.Shard, nil
case reflect2.TypeOf[shared.SharedStore]():
if stg.Shared == nil {
return nil, ErrComponentNotFound
}

return stg.Shared, nil
default:
for _, c := range stg.Components {
if reflect.TypeOf(c) == typ {
return c, nil
}
}

return nil, ErrComponentNotFound
}
}

func GetComponent[T stypes.StorageComponent](mgr *Manager, stgID cdssdk.StorageID) (T, error) {
ret, err := mgr.GetComponent(stgID, reflect2.TypeOf[T]())
if err != nil {
var def T
return def, err
}

return ret.(T), nil
}

+ 0
- 68
common/pkgs/storage/shard/pool/pool.go View File

@@ -1,68 +0,0 @@
package pool

import (
"fmt"
"sync"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/local"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
)

type ShardStorePool struct {
stores map[cdssdk.StorageID]*shardStore
lock sync.Mutex
}

func New() *ShardStorePool {
return &ShardStorePool{
stores: make(map[cdssdk.StorageID]*shardStore),
}
}

func (p *ShardStorePool) PutNew(stg cdssdk.Storage, config cdssdk.ShardStoreConfig) error {
p.lock.Lock()
defer p.lock.Unlock()

switch confg := config.(type) {
case *cdssdk.LocalShardStorage:
if _, ok := p.stores[stg.StorageID]; ok {
return fmt.Errorf("storage %s already exists", stg.StorageID)
}

store, err := local.New(stg, *confg)
if err != nil {
return fmt.Errorf("new local shard store: %v", err)
}

ch := store.Start()

p.stores[stg.StorageID] = &shardStore{
Store: store,
EventChan: ch,
}
return nil

default:
return fmt.Errorf("unsupported shard store type: %T", confg)
}
}

// 不存在时返回nil
func (p *ShardStorePool) Get(stgID cdssdk.StorageID) types.ShardStore {
p.lock.Lock()
defer p.lock.Unlock()

store, ok := p.stores[stgID]
if !ok {
return nil
}

return store.Store
}

type shardStore struct {
Store types.ShardStore
EventChan *async.UnboundChannel[types.StoreEvent]
}

+ 6
- 4
common/pkgs/storage/shard/storages/local/local.go View File

@@ -8,12 +8,12 @@ import (
"os"
"path/filepath"

"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/storages/utils"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/shard/types"
stypes "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

const (
@@ -36,9 +36,11 @@ func New(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*Local, error) {
}, nil
}

func (s *Local) Start() *async.UnboundChannel[types.StoreEvent] {
// TODO 暂时没有需要后台执行的任务
return async.NewUnboundChannel[types.StoreEvent]()
func (s *Local) Start(ch *stypes.StorageEventChan) {

}

func (s *Local) Stop() {
}

func (s *Local) New() types.Writer {


+ 2
- 3
common/pkgs/storage/shard/types/shardstore.go View File

@@ -3,8 +3,8 @@ package types
import (
"io"

"gitlink.org.cn/cloudream/common/pkgs/async"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"
)

type Status interface {
@@ -23,8 +23,7 @@ type StoreEvent interface {
}

type ShardStore interface {
// 启动服务
Start() *async.UnboundChannel[StoreEvent]
types.StorageComponent
// 准备写入一个新文件,写入后获得FileHash
New() Writer
// 使用F函数创建Option对象


+ 7
- 0
common/pkgs/storage/shared/shared.go View File

@@ -0,0 +1,7 @@
package shared

import "gitlink.org.cn/cloudream/storage/common/pkgs/storage/types"

type SharedStore interface {
types.StorageComponent
}

+ 12
- 0
common/pkgs/storage/types/types.go View File

@@ -0,0 +1,12 @@
package types

import "gitlink.org.cn/cloudream/common/pkgs/async"

type StorageEvent interface{}

type StorageEventChan = async.UnboundChannel[StorageEvent]

type StorageComponent interface {
Start(ch *StorageEventChan)
Stop()
}

+ 56
- 0
coordinator/internal/mq/node.go View File

@@ -2,6 +2,8 @@ package mq

import (
"fmt"

stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2"

"gitlink.org.cn/cloudream/common/consts/errorcode"
@@ -11,6 +13,60 @@ import (
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) {
log := logger.WithField("HubID", msg.HubID)

hub, err := svc.db2.Node().GetByID(svc.db2.DefCtx(), msg.HubID)
if err != nil {
log.Warnf("getting hub: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err))
}

detailsMap := make(map[cdssdk.StorageID]*stgmod.StorageDetail)

stgs, err := svc.db2.Storage().GetHubStorages(svc.db2.DefCtx(), msg.HubID)
if err != nil {
log.Warnf("getting hub storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub storages: %v", err))
}

var stgIDs []cdssdk.StorageID
for _, stg := range stgs {
detailsMap[stg.StorageID] = &stgmod.StorageDetail{
Storage: stg,
MasterHub: &hub,
}
stgIDs = append(stgIDs, stg.StorageID)
}

shards, err := svc.db2.ShardStorage().BatchGetByStorageIDs(svc.db2.DefCtx(), stgIDs)
if err != nil {
log.Warnf("getting shard storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting shard storages: %v", err))
}
for _, shard := range shards {
sh := shard
detailsMap[shard.StorageID].Shard = &sh
}

shareds, err := svc.db2.SharedStorage().BatchGetByStorageIDs(svc.db2.DefCtx(), stgIDs)
if err != nil {
log.Warnf("getting shared storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting shared storages: %v", err))
}
for _, shared := range shareds {
sh := shared
detailsMap[shared.StorageID].Shared = &sh
}

var details []stgmod.StorageDetail
for _, detail := range detailsMap {
details = append(details, *detail)
}

return mq.ReplyOK(coormq.RespGetHubConfig(hub, details))
}

func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodesResp, *mq.CodeMessage) {
nodes, err := svc.db2.Node().GetUserNodes(svc.db2.DefCtx(), msg.UserID)
if err != nil {


Loading…
Cancel
Save