Browse Source

agent重命名为hub

gitlink
Sydonian 11 months ago
parent
commit
3dc11f3515
32 changed files with 95 additions and 95 deletions
  1. +4
    -4
      client/internal/cmdline/mount.go
  2. +4
    -4
      client/internal/cmdline/serve.go
  3. +2
    -2
      client/internal/config/config.go
  4. +3
    -3
      client/internal/services/cache.go
  5. +1
    -1
      client/internal/services/storage.go
  6. +7
    -7
      common/globals/pools.go
  7. +0
    -0
      common/pkgs/grpc/hub/client.go
  8. +0
    -0
      common/pkgs/grpc/hub/hub.pb.go
  9. +0
    -0
      common/pkgs/grpc/hub/hub.proto
  10. +0
    -0
      common/pkgs/grpc/hub/hub_grpc.pb.go
  11. +0
    -0
      common/pkgs/grpc/hub/pool.go
  12. +2
    -2
      common/pkgs/ioswitch2/hub_worker.go
  13. +4
    -4
      common/pkgs/ioswitch2/ops2/faas.go
  14. +2
    -2
      common/pkgs/ioswitchlrc/hub_worker.go
  15. +0
    -0
      common/pkgs/mq/hub/client.go
  16. +0
    -0
      common/pkgs/mq/hub/hub.go
  17. +0
    -0
      common/pkgs/mq/hub/server.go
  18. +0
    -0
      common/pkgs/mq/hub/storage.go
  19. +10
    -10
      hub/internal/cmd/serve.go
  20. +17
    -17
      hub/internal/grpc/io.go
  21. +5
    -5
      hub/internal/grpc/ping.go
  22. +3
    -3
      hub/internal/grpc/service.go
  23. +1
    -1
      hub/internal/http/hub_io.go
  24. +3
    -3
      hub/internal/http/service.go
  25. +0
    -10
      hub/internal/mq/agent.go
  26. +10
    -0
      hub/internal/mq/hub.go
  27. +3
    -3
      hub/internal/mq/service.go
  28. +4
    -4
      hub/internal/mq/storage.go
  29. +1
    -1
      hub/internal/task/cache_move_package.go
  30. +4
    -4
      hub/internal/task/task.go
  31. +2
    -2
      hub/internal/tickevent/report_hub_stats.go
  32. +3
    -3
      hub/internal/tickevent/report_storage_stats.go

+ 4
- 4
client/internal/cmdline/mount.go View File

@@ -18,8 +18,8 @@ import (
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
)

func init() {
@@ -52,12 +52,12 @@ func mountCmd(mountPoint string, configPath string) {

stgglb.InitLocal(config.Cfg().Local)
stgglb.InitMQPool(config.Cfg().RabbitMQ)
stgglb.InitHubRPCPool(&agtrpc.PoolConfig{})
stgglb.InitHubRPCPool(&hubrpc.PoolConfig{})
// stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID)
// stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID)

// 初始化存储服务管理器
stgPool := agtpool.NewPool()
stgPool := hubpool.NewPool()

db, err := db2.NewDB(&config.Cfg().DB)
if err != nil {


+ 4
- 4
client/internal/cmdline/serve.go View File

@@ -21,7 +21,7 @@ import (
"gitlink.org.cn/cloudream/storage2/common/models/datamap"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
)

@@ -100,16 +100,16 @@ func serveHTTP(configPath string, listenAddr string) {
go serveAccessStat(acStat)

// 存储管理器
stgAgts := agtpool.NewPool()
stgPool := pool.NewPool()

// 下载策略
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)

// 下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel, db)
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db)

// 上传器
uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta, db)
uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db)

svc, err := services.NewService(distlockSvc, &dlder, acStat, uploader, strgSel, stgMeta, db, evtPub)
if err != nil {


+ 2
- 2
client/internal/config/config.go View File

@@ -11,12 +11,12 @@ import (
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
)

type Config struct {
Local stgglb.LocalMachineInfo `json:"local"`
HubGRPC agtrpc.PoolConfig `json:"hubGRPC"`
HubGRPC hubrpc.PoolConfig `json:"hubGRPC"`
Logger logger.Config `json:"logger"`
DB db.Config `json:"db"`
RabbitMQ mq.Config `json:"rabbitMQ"`


+ 3
- 3
client/internal/services/cache.go View File

@@ -8,7 +8,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory"
)
@@ -43,7 +43,7 @@ func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID c
}
defer stgglb.HubMQPool.Release(hubCli)

startResp, err := hubCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID, stgID))
startResp, err := hubCli.StartCacheMovePackage(hubmq.NewStartCacheMovePackage(userID, packageID, stgID))
if err != nil {
return 0, "", fmt.Errorf("start cache move package: %w", err)
}
@@ -58,7 +58,7 @@ func (svc *CacheService) WaitCacheMovePackage(hubID cdssdk.HubID, taskID string,
}
defer stgglb.HubMQPool.Release(hubCli)

waitResp, err := hubCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds()))
waitResp, err := hubCli.WaitCacheMovePackage(hubmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds()))
if err != nil {
return true, fmt.Errorf("wait cache move package: %w", err)
}


+ 1
- 1
client/internal/services/storage.go View File

@@ -144,7 +144,7 @@ func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, na
// }
// defer stgglb.HubMQPool.Release(hubCli)

// createResp, err := hubCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity))
// createResp, err := hubCli.UserSpaceCreatePackage(hubmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity))
// if err != nil {
// return cdssdk.Package{}, err
// }


+ 7
- 7
common/globals/pools.go View File

@@ -2,13 +2,13 @@ package stgglb

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
scmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner"
)

var HubMQPool agtmq.Pool
var HubMQPool hubmq.Pool

var CoordinatorMQPool coormq.Pool

@@ -19,7 +19,7 @@ var ScannerMQPool scmq.Pool
// @Description: 初始化MQ连接池
// @param cfg
func InitMQPool(cfg mq.Config) {
HubMQPool = agtmq.NewPool(cfg)
HubMQPool = hubmq.NewPool(cfg)

CoordinatorMQPool = coormq.NewPool(cfg)

@@ -27,12 +27,12 @@ func InitMQPool(cfg mq.Config) {

}

var HubRPCPool *agtrpc.Pool
var HubRPCPool *hubrpc.Pool

// InitHubRPCPool
//
// @Description: 初始化HubRPC连接池
// @param cfg
func InitHubRPCPool(cfg *agtrpc.PoolConfig) {
HubRPCPool = agtrpc.NewPool(cfg)
func InitHubRPCPool(cfg *hubrpc.PoolConfig) {
HubRPCPool = hubrpc.NewPool(cfg)
}

common/pkgs/grpc/agent/client.go → common/pkgs/grpc/hub/client.go View File


common/pkgs/grpc/agent/agent.pb.go → common/pkgs/grpc/hub/hub.pb.go View File


common/pkgs/grpc/agent/agent.proto → common/pkgs/grpc/hub/hub.proto View File


common/pkgs/grpc/agent/agent_grpc.pb.go → common/pkgs/grpc/hub/hub_grpc.pb.go View File


common/pkgs/grpc/agent/pool.go → common/pkgs/grpc/hub/pool.go View File


common/pkgs/ioswitch2/agent_worker.go → common/pkgs/ioswitch2/hub_worker.go View File

@@ -9,7 +9,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/serder"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

@@ -47,7 +47,7 @@ func (w *HubWorker) Equals(worker exec.WorkerInfo) bool {

type HubWorkerClient struct {
hubID cortypes.HubID
cli *agtrpc.PoolClient
cli *hubrpc.PoolClient
}

func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {

+ 4
- 4
common/pkgs/ioswitch2/ops2/faas.go View File

@@ -7,7 +7,7 @@ import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types"
)

@@ -20,17 +20,17 @@ type InternalFaaSGalMultiply struct {
}

func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
stgAgts, err := exec.GetValueByType[*agtpool.HubPool](ctx)
stgHubs, err := exec.GetValueByType[*hubpool.HubPool](ctx)
if err != nil {
return err
}

fass, err := agtpool.GetComponent[types.InternalFaaSCall](stgAgts, o.StorageID)
fass, err := hubpool.GetComponent[types.InternalFaaSCall](stgHubs, o.StorageID)
if err != nil {
return fmt.Errorf("getting faas component: %w", err)
}

tmp, err := agtpool.GetComponent[types.TempStore](stgAgts, o.StorageID)
tmp, err := hubpool.GetComponent[types.TempStore](stgHubs, o.StorageID)
if err != nil {
return fmt.Errorf("getting temp store component: %w", err)
}


common/pkgs/ioswitchlrc/agent_worker.go → common/pkgs/ioswitchlrc/hub_worker.go View File

@@ -6,7 +6,7 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

@@ -42,7 +42,7 @@ func (w *HubWorker) Equals(worker exec.WorkerInfo) bool {
}

type HubWorkerClient struct {
cli *agtrpc.PoolClient
cli *hubrpc.PoolClient
}

func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {

common/pkgs/mq/agent/client.go → common/pkgs/mq/hub/client.go View File


common/pkgs/mq/agent/agent.go → common/pkgs/mq/hub/hub.go View File


common/pkgs/mq/agent/server.go → common/pkgs/mq/hub/server.go View File


common/pkgs/mq/agent/storage.go → common/pkgs/mq/hub/storage.go View File


+ 10
- 10
hub/internal/cmd/serve.go View File

@@ -17,7 +17,7 @@ import (
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/models/datamap"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
"gitlink.org.cn/cloudream/storage2/hub/internal/config"
@@ -25,7 +25,7 @@ import (
"google.golang.org/grpc"

coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"

grpcsvc "gitlink.org.cn/cloudream/storage2/hub/internal/grpc"
cmdsvc "gitlink.org.cn/cloudream/storage2/hub/internal/mq"
@@ -60,7 +60,7 @@ func serve(configPath string, httpAddr string) {

stgglb.InitLocal(config.Cfg().Local)
stgglb.InitMQPool(config.Cfg().RabbitMQ)
stgglb.InitHubRPCPool(&agtrpc.PoolConfig{})
stgglb.InitHubRPCPool(&hubrpc.PoolConfig{})
// stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID)
// stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID)
// 获取Hub配置
@@ -152,14 +152,14 @@ func serve(configPath string, httpAddr string) {

// 启动命令服务器
// TODO 需要设计HubID持久化机制
agtSvr, err := agtmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ)
hubSvr, err := hubmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new hub server failed, err: %s", err.Error())
}
agtSvr.OnError(func(err error) {
hubSvr.OnError(func(err error) {
logger.Warnf("hub server err: %s", err.Error())
})
go serveHubServer(agtSvr)
go serveHubServer(hubSvr)

// 启动GRPC服务
listenAddr := config.Cfg().GRPC.MakeListenAddress()
@@ -168,7 +168,7 @@ func serve(configPath string, httpAddr string) {
logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
}
s := grpc.NewServer()
agtrpc.RegisterHubServer(s, grpcsvc.NewService(&worker, stgPool))
hubrpc.RegisterHubServer(s, grpcsvc.NewService(&worker, stgPool))
go serveGRPC(s, lis)

go serveDistLock(distlock)
@@ -229,7 +229,7 @@ loop:
os.Exit(1)
}

func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler {
func setupTickTask(hubPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler {
sch, err := gocron.NewScheduler()
if err != nil {
logger.Errorf("new cron scheduler: %s", err.Error())
@@ -238,7 +238,7 @@ func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Schedu

// sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(
// gocron.NewAtTime(0, 0, 0),
// )), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub))
// )), gocron.NewTask(tickevent.ReportStorageStats, hubPool, evtPub))

// sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(
// gocron.NewAtTime(0, 0, 1),
@@ -251,7 +251,7 @@ func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Schedu
return sch
}

func serveHubServer(server *agtmq.Server) {
func serveHubServer(server *hubmq.Server) {
logger.Info("start serving command server")

ch := server.Start()


+ 17
- 17
hub/internal/grpc/io.go View File

@@ -11,10 +11,10 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/serder"
agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
)

func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanReq) (*agtrpc.ExecuteIOPlanResp, error) {
func (s *Service) ExecuteIOPlan(ctx context.Context, req *hubrpc.ExecuteIOPlanReq) (*hubrpc.ExecuteIOPlanResp, error) {
plan, err := serder.JSONToObjectEx[exec.Plan]([]byte(req.Plan))
if err != nil {
return nil, fmt.Errorf("deserializing plan: %w", err)
@@ -29,7 +29,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe
defer s.swWorker.Remove(sw)

execCtx := exec.NewWithContext(ctx)
exec.SetValueByType(execCtx, s.stgAgts)
exec.SetValueByType(execCtx, s.stgPool)
_, err = sw.Run(execCtx)
if err != nil {
log.Warnf("running io plan: %v", err)
@@ -37,15 +37,15 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe
}

log.Infof("plan finished")
return &agtrpc.ExecuteIOPlanResp{}, nil
return &hubrpc.ExecuteIOPlanResp{}, nil
}

func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error {
func (s *Service) SendStream(server hubrpc.Hub_SendStreamServer) error {
msg, err := server.Recv()
if err != nil {
return fmt.Errorf("recving stream id packet: %w", err)
}
if msg.Type != agtrpc.StreamDataPacketType_SendArgs {
if msg.Type != hubrpc.StreamDataPacketType_SendArgs {
return fmt.Errorf("first packet must be a SendArgs packet")
}

@@ -94,7 +94,7 @@ func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error {

recvSize += int64(len(msg.Data))

if msg.Type == agtrpc.StreamDataPacketType_EOF {
if msg.Type == hubrpc.StreamDataPacketType_EOF {
// 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash
err := pw.Close()
if err != nil {
@@ -103,7 +103,7 @@ func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error {
}

// 并将结果返回到客户端
err = server.SendAndClose(&agtrpc.SendStreamResp{})
err = server.SendAndClose(&hubrpc.SendStreamResp{})
if err != nil {
logger.Warnf("send response failed, err: %s", err.Error())
return fmt.Errorf("send response failed, err: %w", err)
@@ -114,7 +114,7 @@ func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error {
}
}

func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStreamServer) error {
func (s *Service) GetStream(req *hubrpc.GetStreamReq, server hubrpc.Hub_GetStreamServer) error {
logger.
WithField("PlanID", req.PlanID).
WithField("VarID", req.VarID).
@@ -152,8 +152,8 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStrea

if readCnt > 0 {
readAllCnt += readCnt
err = server.Send(&agtrpc.StreamDataPacket{
Type: agtrpc.StreamDataPacketType_Data,
err = server.Send(&hubrpc.StreamDataPacket{
Type: hubrpc.StreamDataPacketType_Data,
Data: buf[:readCnt],
})
if err != nil {
@@ -173,8 +173,8 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStrea
WithField("VarID", req.VarID).
Debugf("send data size %d in %v, speed %v/s", readAllCnt, dt, bytesize.New(float64(readAllCnt)/dt.Seconds()))
// 发送EOF消息
server.Send(&agtrpc.StreamDataPacket{
Type: agtrpc.StreamDataPacketType_EOF,
server.Send(&hubrpc.StreamDataPacket{
Type: hubrpc.StreamDataPacketType_EOF,
})
return nil
}
@@ -190,7 +190,7 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStrea
}
}

func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc.SendVarResp, error) {
func (s *Service) SendVar(ctx context.Context, req *hubrpc.SendVarReq) (*hubrpc.SendVarResp, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

@@ -205,10 +205,10 @@ func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc.
}

sw.PutVar(exec.VarID(req.VarID), v)
return &agtrpc.SendVarResp{}, nil
return &hubrpc.SendVarResp{}, nil
}

func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.GetVarResp, error) {
func (s *Service) GetVar(ctx context.Context, req *hubrpc.GetVarReq) (*hubrpc.GetVarResp, error) {
ctx2, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

@@ -234,7 +234,7 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge
return nil, fmt.Errorf("serializing var: %w", err)
}

return &agtrpc.GetVarResp{
return &hubrpc.GetVarResp{
Var: string(vd),
}, nil
}

+ 5
- 5
hub/internal/grpc/ping.go View File

@@ -3,18 +3,18 @@ package grpc
import (
"context"

agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
)

// Ping 是一个RPC方法,用于验证服务的可用性。
//
// 参数:
// context.Context: 传递上下文信息,包括请求的元数据和取消信号。
// *agtrpc.PingReq: 传递的Ping请求数据,当前实现中未使用。
// *hubrpc.PingReq: 传递的Ping请求数据,当前实现中未使用。
//
// 返回值:
// *agtrpc.PingResp: Ping响应数据,当前实现中始终返回空响应。
// *hubrpc.PingResp: Ping响应数据,当前实现中始终返回空响应。
// error: 如果处理过程中出现错误,则返回错误信息;否则返回nil。
func (s *Service) Ping(context.Context, *agtrpc.PingReq) (*agtrpc.PingResp, error) {
return &agtrpc.PingResp{}, nil
func (s *Service) Ping(context.Context, *hubrpc.PingReq) (*hubrpc.PingResp, error) {
return &hubrpc.PingResp{}, nil
}

+ 3
- 3
hub/internal/grpc/service.go View File

@@ -9,12 +9,12 @@ import (
type Service struct {
hubserver.HubServer
swWorker *exec.Worker
stgAgts *pool.Pool
stgPool *pool.Pool
}

func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service {
func NewService(swWorker *exec.Worker, stgPool *pool.Pool) *Service {
return &Service{
swWorker: swWorker,
stgAgts: stgAgts,
stgPool: stgPool,
}
}

+ 1
- 1
hub/internal/http/hub_io.go View File

@@ -162,7 +162,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) {
defer s.svc.swWorker.Remove(sw)

execCtx := exec.NewWithContext(ctx.Request.Context())
exec.SetValueByType(execCtx, s.svc.stgAgts)
exec.SetValueByType(execCtx, s.svc.stgPool)
_, err = sw.Run(execCtx)
if err != nil {
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err)))


+ 3
- 3
hub/internal/http/service.go View File

@@ -7,12 +7,12 @@ import (

type Service struct {
swWorker *exec.Worker
stgAgts *pool.Pool
stgPool *pool.Pool
}

func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service {
func NewService(swWorker *exec.Worker, stgPool *pool.Pool) *Service {
return &Service{
swWorker: swWorker,
stgAgts: stgAgts,
stgPool: stgPool,
}
}

+ 0
- 10
hub/internal/mq/agent.go View File

@@ -1,10 +0,0 @@
package mq

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
)

func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) {
return mq.ReplyOK(agtmq.NewGetStateResp())
}

+ 10
- 0
hub/internal/mq/hub.go View File

@@ -0,0 +1,10 @@
package mq

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
)

func (svc *Service) GetState(msg *hubmq.GetState) (*hubmq.GetStateResp, *mq.CodeMessage) {
return mq.ReplyOK(hubmq.NewGetStateResp())
}

+ 3
- 3
hub/internal/mq/service.go View File

@@ -5,11 +5,11 @@ import (
)

type Service struct {
stgAgts *pool.Pool
stgPool *pool.Pool
}

func NewService(stgAgts *pool.Pool) *Service {
func NewService(stgPool *pool.Pool) *Service {
return &Service{
stgAgts: stgAgts,
stgPool: stgPool,
}
}

+ 4
- 4
hub/internal/mq/storage.go View File

@@ -6,11 +6,11 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtmq.StorageCreatePackageResp, *mq.CodeMessage) {
func (svc *Service) StorageCreatePackage(msg *hubmq.StorageCreatePackage) (*hubmq.StorageCreatePackageResp, *mq.CodeMessage) {
coorCli, err := stgglb.CoordinatorMQPool.Acquire()
if err != nil {
logger.Warnf("new coordinator client: %s", err.Error())
@@ -19,7 +19,7 @@ func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtm
}
defer stgglb.CoordinatorMQPool.Release(coorCli)

pub, err := svc.stgAgts.GetPublicStore(msg.StorageID)
pub, err := svc.stgHubs.GetPublicStore(msg.StorageID)
if err != nil {
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}
@@ -54,6 +54,6 @@ func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtm
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package))
return mq.ReplyOK(hubmq.RespStorageCreatePackage(createResp.Package))
}
*/

+ 1
- 1
hub/internal/task/cache_move_package.go View File

@@ -40,7 +40,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error {
log.Debugf("begin with %v", logger.FormatStruct(t))
defer log.Debugf("end")

store, err := ctx.stgAgts.GetShardStore(t.storageID)
store, err := ctx.stgHubs.GetShardStore(t.storageID)
if err != nil {
return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err)
}


+ 4
- 4
hub/internal/task/task.go View File

@@ -7,7 +7,7 @@ import (
"gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/downloader"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/uploader"
)

@@ -17,7 +17,7 @@ type TaskContext struct {
connectivity *connectivity.Collector
downloader *downloader.Downloader
accessStat *accessstat.AccessStat
stgAgts *agtpool.HubPool
stgHubs *hubpool.HubPool
uploader *uploader.Uploader
}

@@ -36,13 +36,13 @@ type Task = task.Task[TaskContext]
// CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式
type CompleteOption = task.CompleteOption

func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgAgts *agtpool.HubPool, uploader *uploader.Uploader) Manager {
func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgHubs *hubpool.HubPool, uploader *uploader.Uploader) Manager {
return task.NewManager(TaskContext{
distlock: distlock,
connectivity: connectivity,
downloader: downloader,
accessStat: accessStat,
stgAgts: stgAgts,
stgHubs: stgHubs,
uploader: uploader,
})
}


+ 2
- 2
hub/internal/tickevent/report_hub_stats.go View File

@@ -5,7 +5,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/math2"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
)

@@ -35,7 +35,7 @@ func ReportHubTransferStats(evtPub *sysevent.Publisher) {
}
}

func ReportHubStorageTransferStats(stgAgts *agtpool.HubPool, evtPub *sysevent.Publisher) {
func ReportHubStorageTransferStats(stgHubs *hubpool.HubPool, evtPub *sysevent.Publisher) {
if stgglb.Stats.HubStorageTransfer == nil {
return
}


+ 3
- 3
hub/internal/tickevent/report_storage_stats.go View File

@@ -3,12 +3,12 @@ package tickevent
/*
import (
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
)

func ReportStorageStats(agtPool *agtpool.HubPool, evtPub *sysevent.Publisher) {
stgs := agtPool.GetAllHubs()
func ReportStorageStats(hubPool *hubpool.HubPool, evtPub *sysevent.Publisher) {
stgs := hubPool.GetAllHubs()
for _, stg := range stgs {
shard, err := stg.GetShardStore()
if err != nil {


Loading…
Cancel
Save