diff --git a/client/internal/cmdline/mount.go b/client/internal/cmdline/mount.go index 39e8a1c..1aa0789 100644 --- a/client/internal/cmdline/mount.go +++ b/client/internal/cmdline/mount.go @@ -18,8 +18,8 @@ import ( stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" - agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" ) func init() { @@ -52,12 +52,12 @@ func mountCmd(mountPoint string, configPath string) { stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) + stgglb.InitHubRPCPool(&hubrpc.PoolConfig{}) // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 初始化存储服务管理器 - stgPool := agtpool.NewPool() + stgPool := hubpool.NewPool() db, err := db2.NewDB(&config.Cfg().DB) if err != nil { diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 474a607..3acccb2 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -21,7 +21,7 @@ import ( "gitlink.org.cn/cloudream/storage2/common/models/datamap" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - agtpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" ) @@ -100,16 +100,16 @@ func serveHTTP(configPath string, listenAddr string) { go serveAccessStat(acStat) // 存储管理器 - stgAgts := agtpool.NewPool() + stgPool := pool.NewPool() // 下载策略 strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) // 下载器 - dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel, db) + dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db) // 上传器 - uploader := uploader.NewUploader(distlockSvc, &conCol, stgAgts, stgMeta, db) + uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db) svc, err := services.NewService(distlockSvc, &dlder, acStat, uploader, strgSel, stgMeta, db, evtPub) if err != nil { diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 07fb281..d8f13bb 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -11,12 +11,12 @@ import ( clitypes "gitlink.org.cn/cloudream/storage2/client/types" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" ) type Config struct { Local stgglb.LocalMachineInfo `json:"local"` - HubGRPC agtrpc.PoolConfig `json:"hubGRPC"` + HubGRPC hubrpc.PoolConfig `json:"hubGRPC"` Logger logger.Config `json:"logger"` DB db.Config `json:"db"` RabbitMQ mq.Config `json:"rabbitMQ"` diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go index 1c62521..b4cd62b 100644 --- a/client/internal/services/cache.go +++ b/client/internal/services/cache.go @@ -8,7 +8,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" + hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/factory" ) @@ -43,7 +43,7 @@ func (svc *CacheService) StartCacheMovePackage(userID cdssdk.UserID, packageID c } defer stgglb.HubMQPool.Release(hubCli) - startResp, err := hubCli.StartCacheMovePackage(agtmq.NewStartCacheMovePackage(userID, packageID, stgID)) + startResp, err := hubCli.StartCacheMovePackage(hubmq.NewStartCacheMovePackage(userID, packageID, stgID)) if err != nil { return 0, "", fmt.Errorf("start cache move package: %w", err) } @@ -58,7 +58,7 @@ func (svc *CacheService) WaitCacheMovePackage(hubID cdssdk.HubID, taskID string, } defer stgglb.HubMQPool.Release(hubCli) - waitResp, err := hubCli.WaitCacheMovePackage(agtmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) + waitResp, err := hubCli.WaitCacheMovePackage(hubmq.NewWaitCacheMovePackage(taskID, waitTimeout.Milliseconds())) if err != nil { return true, fmt.Errorf("wait cache move package: %w", err) } diff --git a/client/internal/services/storage.go b/client/internal/services/storage.go index 1dc5bbe..5094b31 100644 --- a/client/internal/services/storage.go +++ b/client/internal/services/storage.go @@ -144,7 +144,7 @@ func (svc *UserSpaceService) UserSpaceCreatePackage(bucketID cdssdk.BucketID, na // } // defer stgglb.HubMQPool.Release(hubCli) - // createResp, err := hubCli.UserSpaceCreatePackage(agtmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) + // createResp, err := hubCli.UserSpaceCreatePackage(hubmq.ReqUserSpaceCreatePackage(bucketID, name, userspaceID, path, userspaceAffinity)) // if err != nil { // return cdssdk.Package{}, err // } diff --git a/common/globals/pools.go b/common/globals/pools.go index 6524859..30c46c4 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -2,13 +2,13 @@ package stgglb import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" + hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" scmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/scanner" ) -var HubMQPool agtmq.Pool +var HubMQPool hubmq.Pool var CoordinatorMQPool coormq.Pool @@ -19,7 +19,7 @@ var ScannerMQPool scmq.Pool // @Description: 初始化MQ连接池 // @param cfg func InitMQPool(cfg mq.Config) { - HubMQPool = agtmq.NewPool(cfg) + HubMQPool = hubmq.NewPool(cfg) CoordinatorMQPool = coormq.NewPool(cfg) @@ -27,12 +27,12 @@ func InitMQPool(cfg mq.Config) { } -var HubRPCPool *agtrpc.Pool +var HubRPCPool *hubrpc.Pool // InitHubRPCPool // // @Description: 初始化HubRPC连接池 // @param cfg -func InitHubRPCPool(cfg *agtrpc.PoolConfig) { - HubRPCPool = agtrpc.NewPool(cfg) +func InitHubRPCPool(cfg *hubrpc.PoolConfig) { + HubRPCPool = hubrpc.NewPool(cfg) } diff --git a/common/pkgs/grpc/agent/client.go b/common/pkgs/grpc/hub/client.go similarity index 100% rename from common/pkgs/grpc/agent/client.go rename to common/pkgs/grpc/hub/client.go diff --git a/common/pkgs/grpc/agent/agent.pb.go b/common/pkgs/grpc/hub/hub.pb.go similarity index 100% rename from common/pkgs/grpc/agent/agent.pb.go rename to common/pkgs/grpc/hub/hub.pb.go diff --git a/common/pkgs/grpc/agent/agent.proto b/common/pkgs/grpc/hub/hub.proto similarity index 100% rename from common/pkgs/grpc/agent/agent.proto rename to common/pkgs/grpc/hub/hub.proto diff --git a/common/pkgs/grpc/agent/agent_grpc.pb.go b/common/pkgs/grpc/hub/hub_grpc.pb.go similarity index 100% rename from common/pkgs/grpc/agent/agent_grpc.pb.go rename to common/pkgs/grpc/hub/hub_grpc.pb.go diff --git a/common/pkgs/grpc/agent/pool.go b/common/pkgs/grpc/hub/pool.go similarity index 100% rename from common/pkgs/grpc/agent/pool.go rename to common/pkgs/grpc/hub/pool.go diff --git a/common/pkgs/ioswitch2/agent_worker.go b/common/pkgs/ioswitch2/hub_worker.go similarity index 96% rename from common/pkgs/ioswitch2/agent_worker.go rename to common/pkgs/ioswitch2/hub_worker.go index db812b9..b791ae2 100644 --- a/common/pkgs/ioswitch2/agent_worker.go +++ b/common/pkgs/ioswitch2/hub_worker.go @@ -9,7 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) @@ -47,7 +47,7 @@ func (w *HubWorker) Equals(worker exec.WorkerInfo) bool { type HubWorkerClient struct { hubID cortypes.HubID - cli *agtrpc.PoolClient + cli *hubrpc.PoolClient } func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { diff --git a/common/pkgs/ioswitch2/ops2/faas.go b/common/pkgs/ioswitch2/ops2/faas.go index a9da667..5cf006f 100644 --- a/common/pkgs/ioswitch2/ops2/faas.go +++ b/common/pkgs/ioswitch2/ops2/faas.go @@ -7,7 +7,7 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/types" ) @@ -20,17 +20,17 @@ type InternalFaaSGalMultiply struct { } func (o *InternalFaaSGalMultiply) Execute(ctx *exec.ExecContext, e *exec.Executor) error { - stgAgts, err := exec.GetValueByType[*agtpool.HubPool](ctx) + stgHubs, err := exec.GetValueByType[*hubpool.HubPool](ctx) if err != nil { return err } - fass, err := agtpool.GetComponent[types.InternalFaaSCall](stgAgts, o.StorageID) + fass, err := hubpool.GetComponent[types.InternalFaaSCall](stgHubs, o.StorageID) if err != nil { return fmt.Errorf("getting faas component: %w", err) } - tmp, err := agtpool.GetComponent[types.TempStore](stgAgts, o.StorageID) + tmp, err := hubpool.GetComponent[types.TempStore](stgHubs, o.StorageID) if err != nil { return fmt.Errorf("getting temp store component: %w", err) } diff --git a/common/pkgs/ioswitchlrc/agent_worker.go b/common/pkgs/ioswitchlrc/hub_worker.go similarity index 95% rename from common/pkgs/ioswitchlrc/agent_worker.go rename to common/pkgs/ioswitchlrc/hub_worker.go index 3638e9c..7c6b703 100644 --- a/common/pkgs/ioswitchlrc/agent_worker.go +++ b/common/pkgs/ioswitchlrc/hub_worker.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" ) @@ -42,7 +42,7 @@ func (w *HubWorker) Equals(worker exec.WorkerInfo) bool { } type HubWorkerClient struct { - cli *agtrpc.PoolClient + cli *hubrpc.PoolClient } func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error { diff --git a/common/pkgs/mq/agent/client.go b/common/pkgs/mq/hub/client.go similarity index 100% rename from common/pkgs/mq/agent/client.go rename to common/pkgs/mq/hub/client.go diff --git a/common/pkgs/mq/agent/agent.go b/common/pkgs/mq/hub/hub.go similarity index 100% rename from common/pkgs/mq/agent/agent.go rename to common/pkgs/mq/hub/hub.go diff --git a/common/pkgs/mq/agent/server.go b/common/pkgs/mq/hub/server.go similarity index 100% rename from common/pkgs/mq/agent/server.go rename to common/pkgs/mq/hub/server.go diff --git a/common/pkgs/mq/agent/storage.go b/common/pkgs/mq/hub/storage.go similarity index 100% rename from common/pkgs/mq/agent/storage.go rename to common/pkgs/mq/hub/storage.go diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index 1ea8a85..c93513d 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -17,7 +17,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage2/common/globals" "gitlink.org.cn/cloudream/storage2/common/models/datamap" "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" "gitlink.org.cn/cloudream/storage2/hub/internal/config" @@ -25,7 +25,7 @@ import ( "google.golang.org/grpc" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" + hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" grpcsvc "gitlink.org.cn/cloudream/storage2/hub/internal/grpc" cmdsvc "gitlink.org.cn/cloudream/storage2/hub/internal/mq" @@ -60,7 +60,7 @@ func serve(configPath string, httpAddr string) { stgglb.InitLocal(config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) + stgglb.InitHubRPCPool(&hubrpc.PoolConfig{}) // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 获取Hub配置 @@ -152,14 +152,14 @@ func serve(configPath string, httpAddr string) { // 启动命令服务器 // TODO 需要设计HubID持久化机制 - agtSvr, err := agtmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ) + hubSvr, err := hubmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new hub server failed, err: %s", err.Error()) } - agtSvr.OnError(func(err error) { + hubSvr.OnError(func(err error) { logger.Warnf("hub server err: %s", err.Error()) }) - go serveHubServer(agtSvr) + go serveHubServer(hubSvr) // 启动GRPC服务 listenAddr := config.Cfg().GRPC.MakeListenAddress() @@ -168,7 +168,7 @@ func serve(configPath string, httpAddr string) { logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } s := grpc.NewServer() - agtrpc.RegisterHubServer(s, grpcsvc.NewService(&worker, stgPool)) + hubrpc.RegisterHubServer(s, grpcsvc.NewService(&worker, stgPool)) go serveGRPC(s, lis) go serveDistLock(distlock) @@ -229,7 +229,7 @@ loop: os.Exit(1) } -func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler { +func setupTickTask(hubPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler { sch, err := gocron.NewScheduler() if err != nil { logger.Errorf("new cron scheduler: %s", err.Error()) @@ -238,7 +238,7 @@ func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Schedu // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( // gocron.NewAtTime(0, 0, 0), - // )), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) + // )), gocron.NewTask(tickevent.ReportStorageStats, hubPool, evtPub)) // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( // gocron.NewAtTime(0, 0, 1), @@ -251,7 +251,7 @@ func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Schedu return sch } -func serveHubServer(server *agtmq.Server) { +func serveHubServer(server *hubmq.Server) { logger.Info("start serving command server") ch := server.Start() diff --git a/hub/internal/grpc/io.go b/hub/internal/grpc/io.go index be4dbd4..137cdc6 100644 --- a/hub/internal/grpc/io.go +++ b/hub/internal/grpc/io.go @@ -11,10 +11,10 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/io2" "gitlink.org.cn/cloudream/common/utils/serder" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" ) -func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanReq) (*agtrpc.ExecuteIOPlanResp, error) { +func (s *Service) ExecuteIOPlan(ctx context.Context, req *hubrpc.ExecuteIOPlanReq) (*hubrpc.ExecuteIOPlanResp, error) { plan, err := serder.JSONToObjectEx[exec.Plan]([]byte(req.Plan)) if err != nil { return nil, fmt.Errorf("deserializing plan: %w", err) @@ -29,7 +29,7 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe defer s.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx) - exec.SetValueByType(execCtx, s.stgAgts) + exec.SetValueByType(execCtx, s.stgPool) _, err = sw.Run(execCtx) if err != nil { log.Warnf("running io plan: %v", err) @@ -37,15 +37,15 @@ func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanRe } log.Infof("plan finished") - return &agtrpc.ExecuteIOPlanResp{}, nil + return &hubrpc.ExecuteIOPlanResp{}, nil } -func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error { +func (s *Service) SendStream(server hubrpc.Hub_SendStreamServer) error { msg, err := server.Recv() if err != nil { return fmt.Errorf("recving stream id packet: %w", err) } - if msg.Type != agtrpc.StreamDataPacketType_SendArgs { + if msg.Type != hubrpc.StreamDataPacketType_SendArgs { return fmt.Errorf("first packet must be a SendArgs packet") } @@ -94,7 +94,7 @@ func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error { recvSize += int64(len(msg.Data)) - if msg.Type == agtrpc.StreamDataPacketType_EOF { + if msg.Type == hubrpc.StreamDataPacketType_EOF { // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash err := pw.Close() if err != nil { @@ -103,7 +103,7 @@ func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error { } // 并将结果返回到客户端 - err = server.SendAndClose(&agtrpc.SendStreamResp{}) + err = server.SendAndClose(&hubrpc.SendStreamResp{}) if err != nil { logger.Warnf("send response failed, err: %s", err.Error()) return fmt.Errorf("send response failed, err: %w", err) @@ -114,7 +114,7 @@ func (s *Service) SendStream(server agtrpc.Hub_SendStreamServer) error { } } -func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStreamServer) error { +func (s *Service) GetStream(req *hubrpc.GetStreamReq, server hubrpc.Hub_GetStreamServer) error { logger. WithField("PlanID", req.PlanID). WithField("VarID", req.VarID). @@ -152,8 +152,8 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStrea if readCnt > 0 { readAllCnt += readCnt - err = server.Send(&agtrpc.StreamDataPacket{ - Type: agtrpc.StreamDataPacketType_Data, + err = server.Send(&hubrpc.StreamDataPacket{ + Type: hubrpc.StreamDataPacketType_Data, Data: buf[:readCnt], }) if err != nil { @@ -173,8 +173,8 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStrea WithField("VarID", req.VarID). Debugf("send data size %d in %v, speed %v/s", readAllCnt, dt, bytesize.New(float64(readAllCnt)/dt.Seconds())) // 发送EOF消息 - server.Send(&agtrpc.StreamDataPacket{ - Type: agtrpc.StreamDataPacketType_EOF, + server.Send(&hubrpc.StreamDataPacket{ + Type: hubrpc.StreamDataPacketType_EOF, }) return nil } @@ -190,7 +190,7 @@ func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Hub_GetStrea } } -func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc.SendVarResp, error) { +func (s *Service) SendVar(ctx context.Context, req *hubrpc.SendVarReq) (*hubrpc.SendVarResp, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() @@ -205,10 +205,10 @@ func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc. } sw.PutVar(exec.VarID(req.VarID), v) - return &agtrpc.SendVarResp{}, nil + return &hubrpc.SendVarResp{}, nil } -func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.GetVarResp, error) { +func (s *Service) GetVar(ctx context.Context, req *hubrpc.GetVarReq) (*hubrpc.GetVarResp, error) { ctx2, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() @@ -234,7 +234,7 @@ func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.Ge return nil, fmt.Errorf("serializing var: %w", err) } - return &agtrpc.GetVarResp{ + return &hubrpc.GetVarResp{ Var: string(vd), }, nil } diff --git a/hub/internal/grpc/ping.go b/hub/internal/grpc/ping.go index 0e1313a..84443a7 100644 --- a/hub/internal/grpc/ping.go +++ b/hub/internal/grpc/ping.go @@ -3,18 +3,18 @@ package grpc import ( "context" - agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" + hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub" ) // Ping 是一个RPC方法,用于验证服务的可用性。 // // 参数: // context.Context: 传递上下文信息,包括请求的元数据和取消信号。 -// *agtrpc.PingReq: 传递的Ping请求数据,当前实现中未使用。 +// *hubrpc.PingReq: 传递的Ping请求数据,当前实现中未使用。 // // 返回值: -// *agtrpc.PingResp: Ping响应数据,当前实现中始终返回空响应。 +// *hubrpc.PingResp: Ping响应数据,当前实现中始终返回空响应。 // error: 如果处理过程中出现错误,则返回错误信息;否则返回nil。 -func (s *Service) Ping(context.Context, *agtrpc.PingReq) (*agtrpc.PingResp, error) { - return &agtrpc.PingResp{}, nil +func (s *Service) Ping(context.Context, *hubrpc.PingReq) (*hubrpc.PingResp, error) { + return &hubrpc.PingResp{}, nil } diff --git a/hub/internal/grpc/service.go b/hub/internal/grpc/service.go index 7d6ef1d..7a75b85 100644 --- a/hub/internal/grpc/service.go +++ b/hub/internal/grpc/service.go @@ -9,12 +9,12 @@ import ( type Service struct { hubserver.HubServer swWorker *exec.Worker - stgAgts *pool.Pool + stgPool *pool.Pool } -func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service { +func NewService(swWorker *exec.Worker, stgPool *pool.Pool) *Service { return &Service{ swWorker: swWorker, - stgAgts: stgAgts, + stgPool: stgPool, } } diff --git a/hub/internal/http/hub_io.go b/hub/internal/http/hub_io.go index c03deae..b11e07f 100644 --- a/hub/internal/http/hub_io.go +++ b/hub/internal/http/hub_io.go @@ -162,7 +162,7 @@ func (s *IOService) ExecuteIOPlan(ctx *gin.Context) { defer s.svc.swWorker.Remove(sw) execCtx := exec.NewWithContext(ctx.Request.Context()) - exec.SetValueByType(execCtx, s.svc.stgAgts) + exec.SetValueByType(execCtx, s.svc.stgPool) _, err = sw.Run(execCtx) if err != nil { ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, fmt.Sprintf("executing plan: %v", err))) diff --git a/hub/internal/http/service.go b/hub/internal/http/service.go index e7f413c..5cc8ffe 100644 --- a/hub/internal/http/service.go +++ b/hub/internal/http/service.go @@ -7,12 +7,12 @@ import ( type Service struct { swWorker *exec.Worker - stgAgts *pool.Pool + stgPool *pool.Pool } -func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service { +func NewService(swWorker *exec.Worker, stgPool *pool.Pool) *Service { return &Service{ swWorker: swWorker, - stgAgts: stgAgts, + stgPool: stgPool, } } diff --git a/hub/internal/mq/agent.go b/hub/internal/mq/agent.go deleted file mode 100644 index 5f46a1b..0000000 --- a/hub/internal/mq/agent.go +++ /dev/null @@ -1,10 +0,0 @@ -package mq - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" -) - -func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { - return mq.ReplyOK(agtmq.NewGetStateResp()) -} diff --git a/hub/internal/mq/hub.go b/hub/internal/mq/hub.go new file mode 100644 index 0000000..1ef13f4 --- /dev/null +++ b/hub/internal/mq/hub.go @@ -0,0 +1,10 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" +) + +func (svc *Service) GetState(msg *hubmq.GetState) (*hubmq.GetStateResp, *mq.CodeMessage) { + return mq.ReplyOK(hubmq.NewGetStateResp()) +} diff --git a/hub/internal/mq/service.go b/hub/internal/mq/service.go index eb0ab15..0b1397c 100644 --- a/hub/internal/mq/service.go +++ b/hub/internal/mq/service.go @@ -5,11 +5,11 @@ import ( ) type Service struct { - stgAgts *pool.Pool + stgPool *pool.Pool } -func NewService(stgAgts *pool.Pool) *Service { +func NewService(stgPool *pool.Pool) *Service { return &Service{ - stgAgts: stgAgts, + stgPool: stgPool, } } diff --git a/hub/internal/mq/storage.go b/hub/internal/mq/storage.go index f566645..db2f2f2 100644 --- a/hub/internal/mq/storage.go +++ b/hub/internal/mq/storage.go @@ -6,11 +6,11 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" - agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" + hubmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/hub" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" ) -func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtmq.StorageCreatePackageResp, *mq.CodeMessage) { +func (svc *Service) StorageCreatePackage(msg *hubmq.StorageCreatePackage) (*hubmq.StorageCreatePackageResp, *mq.CodeMessage) { coorCli, err := stgglb.CoordinatorMQPool.Acquire() if err != nil { logger.Warnf("new coordinator client: %s", err.Error()) @@ -19,7 +19,7 @@ func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtm } defer stgglb.CoordinatorMQPool.Release(coorCli) - pub, err := svc.stgAgts.GetPublicStore(msg.StorageID) + pub, err := svc.stgHubs.GetPublicStore(msg.StorageID) if err != nil { return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } @@ -54,6 +54,6 @@ func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtm return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } - return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package)) + return mq.ReplyOK(hubmq.RespStorageCreatePackage(createResp.Package)) } */ diff --git a/hub/internal/task/cache_move_package.go b/hub/internal/task/cache_move_package.go index 8e3843b..0d0a433 100644 --- a/hub/internal/task/cache_move_package.go +++ b/hub/internal/task/cache_move_package.go @@ -40,7 +40,7 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { log.Debugf("begin with %v", logger.FormatStruct(t)) defer log.Debugf("end") - store, err := ctx.stgAgts.GetShardStore(t.storageID) + store, err := ctx.stgHubs.GetShardStore(t.storageID) if err != nil { return fmt.Errorf("get shard store of storage %v: %w", t.storageID, err) } diff --git a/hub/internal/task/task.go b/hub/internal/task/task.go index b1c4bdf..146a4b7 100644 --- a/hub/internal/task/task.go +++ b/hub/internal/task/task.go @@ -7,7 +7,7 @@ import ( "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" ) @@ -17,7 +17,7 @@ type TaskContext struct { connectivity *connectivity.Collector downloader *downloader.Downloader accessStat *accessstat.AccessStat - stgAgts *agtpool.HubPool + stgHubs *hubpool.HubPool uploader *uploader.Uploader } @@ -36,13 +36,13 @@ type Task = task.Task[TaskContext] // CompleteOption 类型定义了任务完成时的选项,可用于定制化任务完成的处理方式 type CompleteOption = task.CompleteOption -func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgAgts *agtpool.HubPool, uploader *uploader.Uploader) Manager { +func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector, downloader *downloader.Downloader, accessStat *accessstat.AccessStat, stgHubs *hubpool.HubPool, uploader *uploader.Uploader) Manager { return task.NewManager(TaskContext{ distlock: distlock, connectivity: connectivity, downloader: downloader, accessStat: accessStat, - stgAgts: stgAgts, + stgHubs: stgHubs, uploader: uploader, }) } diff --git a/hub/internal/tickevent/report_hub_stats.go b/hub/internal/tickevent/report_hub_stats.go index 66a4339..deaf697 100644 --- a/hub/internal/tickevent/report_hub_stats.go +++ b/hub/internal/tickevent/report_hub_stats.go @@ -5,7 +5,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/math2" stgglb "gitlink.org.cn/cloudream/storage2/common/globals" stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" ) @@ -35,7 +35,7 @@ func ReportHubTransferStats(evtPub *sysevent.Publisher) { } } -func ReportHubStorageTransferStats(stgAgts *agtpool.HubPool, evtPub *sysevent.Publisher) { +func ReportHubStorageTransferStats(stgHubs *hubpool.HubPool, evtPub *sysevent.Publisher) { if stgglb.Stats.HubStorageTransfer == nil { return } diff --git a/hub/internal/tickevent/report_storage_stats.go b/hub/internal/tickevent/report_storage_stats.go index 6858e0e..662b66e 100644 --- a/hub/internal/tickevent/report_storage_stats.go +++ b/hub/internal/tickevent/report_storage_stats.go @@ -3,12 +3,12 @@ package tickevent /* import ( stgmod "gitlink.org.cn/cloudream/storage2/common/models" - "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/hubpool" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" ) -func ReportStorageStats(agtPool *agtpool.HubPool, evtPub *sysevent.Publisher) { - stgs := agtPool.GetAllHubs() +func ReportStorageStats(hubPool *hubpool.HubPool, evtPub *sysevent.Publisher) { + stgs := hubPool.GetAllHubs() for _, stg := range stgs { shard, err := stg.GetShardStore() if err != nil {