diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 50fd85f..7f9a567 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -67,8 +67,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { } stgglb.InitLocal(config.Cfg().Local) - stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(config.Cfg().HubRPC) + stgglb.InitPools(&config.Cfg().HubRPC, &config.Cfg().CoordinatorRPC) // 数据库 db, err := db.NewDB(&config.Cfg().DB) diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 94c0f55..48efd7e 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -80,8 +80,7 @@ func test(configPath string) { } stgglb.InitLocal(config.Cfg().Local) - stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(config.Cfg().HubRPC) + stgglb.InitPools(&config.Cfg().HubRPC, &config.Cfg().CoordinatorRPC) // 数据库 db, err := db.NewDB(&config.Cfg().DB) diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 443af32..5ceeee2 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -60,8 +60,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { } stgglb.InitLocal(config.Cfg().Local) - stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(config.Cfg().HubRPC) + stgglb.InitPools(&config.Cfg().HubRPC, &config.Cfg().CoordinatorRPC) // 数据库 db, err := db.NewDB(&config.Cfg().DB) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index d8841dc..82d4fc1 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -12,12 +12,14 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" ) type Config struct { Local stgglb.LocalMachineInfo `json:"local"` HubRPC hubrpc.PoolConfig `json:"hubRPC"` + CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"` Logger logger.Config `json:"logger"` DB db.Config `json:"db"` RabbitMQ mq.Config `json:"rabbitMQ"` diff --git a/client/internal/metacache/connectivity.go b/client/internal/metacache/connectivity.go index 4c415ae..9d863a1 100644 --- a/client/internal/metacache/connectivity.go +++ b/client/internal/metacache/connectivity.go @@ -1,12 +1,13 @@ package metacache import ( + "context" "sync" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -60,16 +61,13 @@ func (c *Connectivity) ClearOutdated() { } func (c *Connectivity) load(hubID cortypes.HubID) { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %v", err) - return - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + + defer coorCli.Release() - get, err := coorCli.GetHubConnectivities(coormq.ReqGetHubConnectivities([]cortypes.HubID{hubID})) - if err != nil { - logger.Warnf("get hub connectivities: %v", err) + get, cerr := coorCli.GetHubConnectivities(context.Background(), corrpc.ReqGetHubConnectivities([]cortypes.HubID{hubID})) + if cerr != nil { + logger.Warnf("get hub connectivities: %v", cerr) return } diff --git a/client/internal/metacache/hubmeta.go b/client/internal/metacache/hubmeta.go index 13ff59e..7442b69 100644 --- a/client/internal/metacache/hubmeta.go +++ b/client/internal/metacache/hubmeta.go @@ -1,11 +1,12 @@ package metacache import ( + "context" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -51,16 +52,12 @@ func (h *HubMeta) load(keys []cortypes.HubID) ([]cortypes.Hub, []bool) { vs := make([]cortypes.Hub, len(keys)) oks := make([]bool, len(keys)) - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %v", err) - return vs, oks - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() - get, err := coorCli.GetHubs(coormq.NewGetHubs(keys)) - if err != nil { - logger.Warnf("get hubs: %v", err) + get, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(keys)) + if cerr != nil { + logger.Warnf("get hubs: %v", cerr) return vs, oks } diff --git a/client/internal/metacache/storagemeta.go b/client/internal/metacache/storagemeta.go index 5922b61..3e5f287 100644 --- a/client/internal/metacache/storagemeta.go +++ b/client/internal/metacache/storagemeta.go @@ -1,12 +1,13 @@ package metacache import ( + "context" "time" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/types" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -61,21 +62,17 @@ func (s *UserSpaceMeta) load(keys []types.UserSpaceID) ([]types.UserSpaceDetail, return vs, oks } - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Warnf("new coordinator client: %v", err) - return vs, oks - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() stgIDs := make([]cortypes.StorageID, len(spaces)) for i := range spaces { stgIDs[i] = spaces[i].StorageID } - getStgs, err := coorCli.GetStorageDetails(coordinator.ReqGetStorageDetails(stgIDs)) - if err != nil { - logger.Warnf("get storage details: %v", err) + getStgs, cerr := coorCli.GetStorageDetails(context.Background(), corrpc.ReqGetStorageDetails(stgIDs)) + if cerr != nil { + logger.Warnf("get storage details: %v", cerr) return vs, oks } diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index e3fa1c9..246bad2 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -40,11 +40,8 @@ func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) } func (svc *UserSpaceService) LoadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return fmt.Errorf("new coordinator client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() destStg := svc.UserSpaceMeta.Get(userspaceID) if destStg == nil { @@ -165,7 +162,7 @@ func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPa Path: srcPath, }) if cerr != nil { - return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr) + return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr.ToError()) } srcPathComps := clitypes.SplitObjectPath(srcPath) diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index a10f895..305169c 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -48,6 +48,9 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { if space == nil { continue } + if space.MasterHub == nil { + continue + } ctx.allUserSpaces[space.UserSpace.UserSpaceID] = &userSpaceLoadInfo{ UserSpace: space, diff --git a/client/internal/ticktock/shardstore_gc.go b/client/internal/ticktock/shardstore_gc.go index 8dafb6b..ac35fc2 100644 --- a/client/internal/ticktock/shardstore_gc.go +++ b/client/internal/ticktock/shardstore_gc.go @@ -103,7 +103,7 @@ func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { Availables: allFileHashes, }) if cerr != nil { - return fmt.Errorf("request to cache gc: %w", cerr) + return fmt.Errorf("request to cache gc: %w", cerr.ToError()) } return nil } diff --git a/client/internal/ticktock/update_package_access_stat_amount.go b/client/internal/ticktock/update_package_access_stat_amount.go index 3e8de65..1fd6fea 100644 --- a/client/internal/ticktock/update_package_access_stat_amount.go +++ b/client/internal/ticktock/update_package_access_stat_amount.go @@ -5,11 +5,9 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/reflect2" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" ) type UpdatePackageAccessStatAmount struct { - *scevt.UpdatePackageAccessStatAmount } func (j *UpdatePackageAccessStatAmount) Name() string { diff --git a/client/internal/uploader/user_space_upload.go b/client/internal/uploader/user_space_upload.go index 9223aa4..e7b6dc3 100644 --- a/client/internal/uploader/user_space_upload.go +++ b/client/internal/uploader/user_space_upload.go @@ -15,7 +15,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/ops2" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" @@ -56,17 +56,13 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st return e != nil && e.MasterHub != nil && e.UserSpace.ShardStore != nil }) - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - delPkg() - return nil, fmt.Errorf("acquiring coordinator mq client: %w", err) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() - resp, err := coorCli.GetHubConnectivities(coordinator.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.MasterHub.HubID})) - if err != nil { + resp, cerr := coorCli.GetHubConnectivities(context.Background(), corrpc.ReqGetHubConnectivities([]cortypes.HubID{srcSpace.MasterHub.HubID})) + if cerr != nil { delPkg() - return nil, fmt.Errorf("getting hub connectivities: %w", err) + return nil, fmt.Errorf("getting hub connectivities: %w", cerr.ToError()) } cons := make(map[cortypes.HubID]cortypes.HubConnectivity) @@ -115,7 +111,7 @@ func (u *Uploader) UserSpaceUpload(userSpaceID clitypes.UserSpaceID, rootPath st }) if cerr != nil { delPkg() - return nil, fmt.Errorf("listing public store: %w", cerr) + return nil, fmt.Errorf("listing public store: %w", cerr.ToError()) } adds, err := u.uploadFromPublicStore(srcSpace, &targetSapce.Space, listAllResp.Entries, rootPath) diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index 4668194..548e035 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -6,6 +6,9 @@ "locationID": 1 }, "hubRPC": {}, + "coordinatorRPC": { + "address": "127.0.0.1:5009" + }, "logger": { "output": "stdout", "level": "debug" diff --git a/common/assets/confs/coordinator.config.json b/common/assets/confs/coordinator.config.json index 86bbbd0..85fbfc4 100644 --- a/common/assets/confs/coordinator.config.json +++ b/common/assets/confs/coordinator.config.json @@ -23,5 +23,8 @@ }, "tickTock": { "hubUnavailableTime": "20s" + }, + "rpc": { + "listen": "127.0.0.1:5009" } } \ No newline at end of file diff --git a/common/assets/confs/hub.config.json b/common/assets/confs/hub.config.json index 7f44e31..2c40e0d 100644 --- a/common/assets/confs/hub.config.json +++ b/common/assets/confs/hub.config.json @@ -8,6 +8,9 @@ "rpc": { "listen": "127.0.0.1:5010" }, + "coordinatorRPC": { + "address": "127.0.0.1:5009" + }, "logger": { "output": "file", "outputFileName": "hub", diff --git a/common/globals/pools.go b/common/globals/pools.go index 2a82937..6ab89ac 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -1,33 +1,19 @@ package stgglb import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" - scmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" ) -var CoordinatorMQPool coormq.Pool - -var ScannerMQPool scmq.Pool - -// InitMQPool -// -// @Description: 初始化MQ连接池 -// @param cfg -func InitMQPool(cfg mq.Config) { - CoordinatorMQPool = coormq.NewPool(cfg) - - ScannerMQPool = scmq.NewPool(cfg) - -} - +var CoordinatorRPCPool *corrpc.Pool var HubRPCPool *hubrpc.Pool -// InitHubRPCPool -// -// @Description: 初始化HubRPC连接池 -// @param cfg -func InitHubRPCPool(cfg hubrpc.PoolConfig) { - HubRPCPool = hubrpc.NewPool(cfg) +func InitPools(hubRPC *hubrpc.PoolConfig, corRPC *corrpc.PoolConfig) { + if hubRPC != nil { + HubRPCPool = hubrpc.NewPool(*hubRPC) + } + + if corRPC != nil { + CoordinatorRPCPool = corrpc.NewPool(*corRPC) + } } diff --git a/common/pkgs/connectivity/collector.go b/common/pkgs/connectivity/collector.go index 6b6cf63..9f5a2fd 100644 --- a/common/pkgs/connectivity/collector.go +++ b/common/pkgs/connectivity/collector.go @@ -8,7 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) @@ -121,14 +121,11 @@ func (r *Collector) testing() { log := logger.WithType[Collector]("") log.Debug("do testing") - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - return - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() - getHubResp, err := coorCli.GetHubs(coormq.NewGetHubs(nil)) - if err != nil { + getHubResp, cerr := coorCli.GetHubs(context.Background(), corrpc.NewGetHubs(nil)) + if cerr != nil { return } diff --git a/common/pkgs/mq/consts.go b/common/pkgs/mq/consts.go deleted file mode 100644 index d7562d4..0000000 --- a/common/pkgs/mq/consts.go +++ /dev/null @@ -1,13 +0,0 @@ -package mq - -import "fmt" - -const ( - COORDINATOR_QUEUE_NAME = "Coordinator" - SCANNER_QUEUE_NAME = "Scanner" - DATAMAP_QUEUE_NAME = "DataMap" -) - -func MakeHubQueueName(id int64) string { - return fmt.Sprintf("Hub@%d", id) -} diff --git a/common/pkgs/mq/coordinator/client.go b/common/pkgs/mq/coordinator/client.go deleted file mode 100644 index 01326aa..0000000 --- a/common/pkgs/mq/coordinator/client.go +++ /dev/null @@ -1,60 +0,0 @@ -package coordinator - -import ( - "sync" - - "gitlink.org.cn/cloudream/common/pkgs/mq" - stgmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq" -) - -type Client struct { - rabbitCli *mq.RabbitMQTransport -} - -func NewClient(cfg mq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.COORDINATOR_QUEUE_NAME, "") - if err != nil { - return nil, err - } - - return &Client{ - rabbitCli: rabbitCli, - }, nil -} - -func (c *Client) Close() { - c.rabbitCli.Close() -} - -type Pool interface { - Acquire() (*Client, error) - Release(cli *Client) -} - -type pool struct { - mqcfg mq.Config - shared *Client - lock sync.Mutex -} - -func NewPool(mqcfg mq.Config) Pool { - return &pool{ - mqcfg: mqcfg, - } -} -func (p *pool) Acquire() (*Client, error) { - p.lock.Lock() - defer p.lock.Unlock() - if p.shared == nil { - var err error - p.shared, err = NewClient(p.mqcfg) - if err != nil { - return nil, err - } - } - - return p.shared, nil -} - -func (p *pool) Release(cli *Client) { -} diff --git a/common/pkgs/mq/coordinator/coordinator_test.go b/common/pkgs/mq/coordinator/coordinator_test.go deleted file mode 100644 index 3e5d87a..0000000 --- a/common/pkgs/mq/coordinator/coordinator_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package coordinator - -import ( - "testing" - - . "github.com/smartystreets/goconvey/convey" -) - -func TestSerder(t *testing.T) { - Convey("输出注册的Handler", t, func() { - for k, _ := range msgDispatcher.Handlers { - t.Logf("(%s)", k) - } - }) -} diff --git a/common/pkgs/mq/coordinator/server.go b/common/pkgs/mq/coordinator/server.go deleted file mode 100644 index 06214f4..0000000 --- a/common/pkgs/mq/coordinator/server.go +++ /dev/null @@ -1,72 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/common/utils/sync2" - mymq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq" -) - -// Service 协调端接口 -type Service interface { - HubService - - StorageService -} - -type Server struct { - service Service - rabbitSvr mq.RabbitMQServer -} - -func NewServer(svc Service, cfg mq.Config) (*Server, error) { - srv := &Server{ - service: svc, - } - - rabbitSvr, err := mq.NewRabbitMQServer( - cfg, - mymq.COORDINATOR_QUEUE_NAME, - func(msg *mq.Message) (*mq.Message, error) { - return msgDispatcher.Handle(srv.service, msg) - }, - ) - if err != nil { - return nil, err - } - - srv.rabbitSvr = *rabbitSvr - - return srv, nil -} -func (s *Server) Stop() { - s.rabbitSvr.Close() -} - -func (s *Server) Start(cfg mq.Config) *sync2.UnboundChannel[mq.RabbitMQServerEvent] { - return s.rabbitSvr.Start() -} - -func (s *Server) OnError(callback func(error)) { - s.rabbitSvr.OnError = callback -} - -var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() - -// Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 -// TODO 需要约束:Service实现了TSvc接口 -func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { - mq.AddServiceFn(&msgDispatcher, svcFn) - mq.RegisterMessage[TReq]() - mq.RegisterMessage[TResp]() - - return nil -} - -// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 -// TODO 需要约束:Service实现了TSvc接口 -func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any { - mq.AddNoRespServiceFn(&msgDispatcher, svcFn) - mq.RegisterMessage[TReq]() - - return nil -} diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go deleted file mode 100644 index fc77423..0000000 --- a/common/pkgs/mq/coordinator/storage.go +++ /dev/null @@ -1,36 +0,0 @@ -package coordinator - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" -) - -type StorageService interface { - GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, *mq.CodeMessage) -} - -// 获取Storage信息 -var _ = Register(Service.GetStorageDetails) - -type GetStorageDetails struct { - mq.MessageBodyBase - StorageIDs []cortypes.StorageID `json:"storageIDs"` -} -type GetStorageDetailsResp struct { - mq.MessageBodyBase - Storage []*cortypes.StorageDetail `json:"storages"` -} - -func ReqGetStorageDetails(storageIDs []cortypes.StorageID) *GetStorageDetails { - return &GetStorageDetails{ - StorageIDs: storageIDs, - } -} -func RespGetStorageDetails(stgs []*cortypes.StorageDetail) *GetStorageDetailsResp { - return &GetStorageDetailsResp{ - Storage: stgs, - } -} -func (client *Client) GetStorageDetails(msg *GetStorageDetails) (*GetStorageDetailsResp, error) { - return mq.Request(Service.GetStorageDetails, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/scanner/client.go b/common/pkgs/mq/scanner/client.go deleted file mode 100644 index de92534..0000000 --- a/common/pkgs/mq/scanner/client.go +++ /dev/null @@ -1,60 +0,0 @@ -package scanner - -import ( - "sync" - - "gitlink.org.cn/cloudream/common/pkgs/mq" - stgmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq" -) - -type Client struct { - rabbitCli *mq.RabbitMQTransport -} - -func NewClient(cfg mq.Config) (*Client, error) { - rabbitCli, err := mq.NewRabbitMQTransport(cfg, stgmq.SCANNER_QUEUE_NAME, "") - if err != nil { - return nil, err - } - - return &Client{ - rabbitCli: rabbitCli, - }, nil -} - -func (c *Client) Close() { - c.rabbitCli.Close() -} - -type Pool interface { - Acquire() (*Client, error) - Release(cli *Client) -} - -type pool struct { - mqcfg mq.Config - shared *Client - lock sync.Mutex -} - -func NewPool(mqcfg mq.Config) Pool { - return &pool{ - mqcfg: mqcfg, - } -} -func (p *pool) Acquire() (*Client, error) { - p.lock.Lock() - defer p.lock.Unlock() - if p.shared == nil { - var err error - p.shared, err = NewClient(p.mqcfg) - if err != nil { - return nil, err - } - } - - return p.shared, nil -} - -func (p *pool) Release(cli *Client) { -} diff --git a/common/pkgs/mq/scanner/event.go b/common/pkgs/mq/scanner/event.go deleted file mode 100644 index 7759240..0000000 --- a/common/pkgs/mq/scanner/event.go +++ /dev/null @@ -1,31 +0,0 @@ -package scanner - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" -) - -type EventService interface { - PostEvent(event *PostEvent) -} - -// 投递Event -var _ = RegisterNoReply(Service.PostEvent) - -type PostEvent struct { - mq.MessageBodyBase - Event scevt.Event `json:"event"` - IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 - DontMerge bool `json:"dontMerge"` // 不可合并此消息 -} - -func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) *PostEvent { - return &PostEvent{ - Event: event, - IsEmergency: isEmergency, - DontMerge: dontMerge, - } -} -func (client *Client) PostEvent(msg *PostEvent) error { - return mq.Send(Service.PostEvent, client.rabbitCli, msg) -} diff --git a/common/pkgs/mq/scanner/event/agent_check_shardstore.go b/common/pkgs/mq/scanner/event/agent_check_shardstore.go deleted file mode 100644 index a7ff49c..0000000 --- a/common/pkgs/mq/scanner/event/agent_check_shardstore.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type HubCheckShardStore struct { - EventBase - StorageID cdssdk.StorageID `json:"storageID"` -} - -func NewHubCheckShardStore(stgID cdssdk.StorageID) *HubCheckShardStore { - return &HubCheckShardStore{ - StorageID: stgID, - } -} - -func init() { - Register[*HubCheckShardStore]() -} diff --git a/common/pkgs/mq/scanner/event/agent_check_state.go b/common/pkgs/mq/scanner/event/agent_check_state.go deleted file mode 100644 index 529e0cb..0000000 --- a/common/pkgs/mq/scanner/event/agent_check_state.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type HubCheckState struct { - EventBase - HubID cdssdk.HubID `json:"hubID"` -} - -func NewHubCheckState(hubID cdssdk.HubID) *HubCheckState { - return &HubCheckState{ - HubID: hubID, - } -} - -func init() { - Register[*HubCheckState]() -} diff --git a/common/pkgs/mq/scanner/event/agent_check_storage.go b/common/pkgs/mq/scanner/event/agent_check_storage.go deleted file mode 100644 index d3f8c0f..0000000 --- a/common/pkgs/mq/scanner/event/agent_check_storage.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type HubCheckStorage struct { - EventBase - StorageID cdssdk.StorageID `json:"storageID"` -} - -func NewHubCheckStorage(storageID cdssdk.StorageID) *HubCheckStorage { - return &HubCheckStorage{ - StorageID: storageID, - } -} - -func init() { - Register[*HubCheckStorage]() -} diff --git a/common/pkgs/mq/scanner/event/agent_shardstore_gc.go b/common/pkgs/mq/scanner/event/agent_shardstore_gc.go deleted file mode 100644 index e4a7f62..0000000 --- a/common/pkgs/mq/scanner/event/agent_shardstore_gc.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type HubShardStoreGC struct { - EventBase - StorageID cdssdk.StorageID `json:"storageID"` -} - -func NewHubShardStoreGC(stgID cdssdk.StorageID) *HubShardStoreGC { - return &HubShardStoreGC{ - StorageID: stgID, - } -} - -func init() { - Register[*HubShardStoreGC]() -} diff --git a/common/pkgs/mq/scanner/event/agent_storage_gc.go b/common/pkgs/mq/scanner/event/agent_storage_gc.go deleted file mode 100644 index 683c325..0000000 --- a/common/pkgs/mq/scanner/event/agent_storage_gc.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type HubStorageGC struct { - EventBase - StorageID cdssdk.StorageID `json:"storageID"` -} - -func NewHubStorageGC(storageID cdssdk.StorageID) *HubStorageGC { - return &HubStorageGC{ - StorageID: storageID, - } -} - -func init() { - Register[*HubStorageGC]() -} diff --git a/common/pkgs/mq/scanner/event/check_package.go b/common/pkgs/mq/scanner/event/check_package.go deleted file mode 100644 index f6bdea8..0000000 --- a/common/pkgs/mq/scanner/event/check_package.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type CheckPackage struct { - EventBase - PackageIDs []cdssdk.PackageID `json:"packageIDs"` -} - -func NewCheckPackage(packageIDs []cdssdk.PackageID) *CheckPackage { - return &CheckPackage{ - PackageIDs: packageIDs, - } -} - -func init() { - Register[*CheckPackage]() -} diff --git a/common/pkgs/mq/scanner/event/check_package_redundancy.go b/common/pkgs/mq/scanner/event/check_package_redundancy.go deleted file mode 100644 index 1c7687a..0000000 --- a/common/pkgs/mq/scanner/event/check_package_redundancy.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type CheckPackageRedundancy struct { - EventBase - PackageID cdssdk.PackageID `json:"packageIDs"` -} - -func NewCheckPackageRedundancy(packageID cdssdk.PackageID) *CheckPackageRedundancy { - return &CheckPackageRedundancy{ - PackageID: packageID, - } -} - -func init() { - Register[*CheckPackageRedundancy]() -} diff --git a/common/pkgs/mq/scanner/event/clean_pinned.go b/common/pkgs/mq/scanner/event/clean_pinned.go deleted file mode 100644 index 1e1baea..0000000 --- a/common/pkgs/mq/scanner/event/clean_pinned.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type CleanPinned struct { - EventBase - PackageID cdssdk.PackageID `json:"hubID"` -} - -func NewCleanPinned(packageID cdssdk.PackageID) *CleanPinned { - return &CleanPinned{ - PackageID: packageID, - } -} - -func init() { - Register[*CleanPinned]() -} diff --git a/common/pkgs/mq/scanner/event/event.go b/common/pkgs/mq/scanner/event/event.go deleted file mode 100644 index 7885c72..0000000 --- a/common/pkgs/mq/scanner/event/event.go +++ /dev/null @@ -1,23 +0,0 @@ -package event - -import ( - "gitlink.org.cn/cloudream/common/pkgs/types" - "gitlink.org.cn/cloudream/common/utils/reflect2" - "gitlink.org.cn/cloudream/common/utils/serder" -) - -type Event interface { - Noop() -} - -var EventTypeUnino = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Event]())) - -type EventBase struct{} - -func (e *EventBase) Noop() {} - -// 只能在init函数中调用,因为包级变量初始化比init函数调用先进行 -func Register[T Event]() any { - EventTypeUnino.Add(reflect2.TypeOf[T]()) - return nil -} diff --git a/common/pkgs/mq/scanner/event/update_package_access_stat_amount.go b/common/pkgs/mq/scanner/event/update_package_access_stat_amount.go deleted file mode 100644 index e7fade6..0000000 --- a/common/pkgs/mq/scanner/event/update_package_access_stat_amount.go +++ /dev/null @@ -1,18 +0,0 @@ -package event - -import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - -type UpdatePackageAccessStatAmount struct { - EventBase - PackageIDs []cdssdk.PackageID `json:"packageIDs"` -} - -func NewUpdatePackageAccessStatAmount(packageIDs []cdssdk.PackageID) *UpdatePackageAccessStatAmount { - return &UpdatePackageAccessStatAmount{ - PackageIDs: packageIDs, - } -} - -func init() { - Register[*UpdatePackageAccessStatAmount]() -} diff --git a/common/pkgs/mq/scanner/server.go b/common/pkgs/mq/scanner/server.go deleted file mode 100644 index 5be32b1..0000000 --- a/common/pkgs/mq/scanner/server.go +++ /dev/null @@ -1,70 +0,0 @@ -package scanner - -import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/common/utils/sync2" - mymq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq" -) - -// Service 协调端接口 -type Service interface { - EventService -} -type Server struct { - service Service - rabbitSvr mq.RabbitMQServer -} - -func NewServer(svc Service, cfg mq.Config) (*Server, error) { - srv := &Server{ - service: svc, - } - - rabbitSvr, err := mq.NewRabbitMQServer( - cfg, - mymq.SCANNER_QUEUE_NAME, - func(msg *mq.Message) (*mq.Message, error) { - return msgDispatcher.Handle(srv.service, msg) - }, - ) - if err != nil { - return nil, err - } - - srv.rabbitSvr = *rabbitSvr - - return srv, nil -} - -func (s *Server) Stop() { - s.rabbitSvr.Close() -} - -func (s *Server) Start() *sync2.UnboundChannel[mq.RabbitMQServerEvent] { - return s.rabbitSvr.Start() -} - -func (s *Server) OnError(callback func(error)) { - s.rabbitSvr.OnError = callback -} - -var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() - -// Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 -// TODO 需要约束:Service实现了TSvc接口 -func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { - mq.AddServiceFn(&msgDispatcher, svcFn) - mq.RegisterMessage[TReq]() - mq.RegisterMessage[TResp]() - - return nil -} - -// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 -// TODO 需要约束:Service实现了TSvc接口 -func RegisterNoReply[TReq mq.MessageBody](svcFn func(svc Service, msg TReq)) any { - mq.AddNoRespServiceFn(&msgDispatcher, svcFn) - mq.RegisterMessage[TReq]() - - return nil -} diff --git a/common/pkgs/rpc/coordinator/client.go b/common/pkgs/rpc/coordinator/client.go new file mode 100644 index 0000000..ebdfd0b --- /dev/null +++ b/common/pkgs/rpc/coordinator/client.go @@ -0,0 +1,22 @@ +package corrpc + +import ( + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + "google.golang.org/grpc" +) + +type Client struct { + con *grpc.ClientConn + cli CoordinatorClient + pool *Pool + fusedErr *rpc.CodeError +} + +func (c *Client) Release() { + if c.con != nil { + c.pool.release() + } +} + +// 客户端的API要和服务端的API保持一致 +var _ CoordinatorAPI = (*Client)(nil) diff --git a/common/pkgs/rpc/coordinator/coordinator.pb.go b/common/pkgs/rpc/coordinator/coordinator.pb.go new file mode 100644 index 0000000..710bfd8 --- /dev/null +++ b/common/pkgs/rpc/coordinator/coordinator.pb.go @@ -0,0 +1,92 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v4.22.3 +// source: pkgs/rpc/coordinator/coordinator.proto + +package corrpc + +import ( + rpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var File_pkgs_rpc_coordinator_coordinator_proto protoreflect.FileDescriptor + +var file_pkgs_rpc_coordinator_coordinator_proto_rawDesc = []byte{ + 0x0a, 0x26, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x6f, 0x72, 0x64, + 0x69, 0x6e, 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x63, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, 0x61, 0x74, + 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, + 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x32, 0xc9, 0x01, 0x0a, 0x0b, 0x43, 0x6f, 0x6f, 0x72, 0x64, 0x69, 0x6e, + 0x61, 0x74, 0x6f, 0x72, 0x12, 0x2b, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x48, 0x75, 0x62, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x26, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x48, 0x75, 0x62, 0x73, 0x12, 0x0c, 0x2e, 0x72, + 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, + 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, + 0x48, 0x75, 0x62, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x76, 0x69, 0x74, 0x69, 0x65, + 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x30, + 0x0a, 0x11, 0x47, 0x65, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x44, 0x65, 0x74, 0x61, + 0x69, 0x6c, 0x73, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, + 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, + 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, + 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6f, 0x72, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6f, 0x72, 0x72, + 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var file_pkgs_rpc_coordinator_coordinator_proto_goTypes = []any{ + (*rpc.Request)(nil), // 0: rpc.Request + (*rpc.Response)(nil), // 1: rpc.Response +} +var file_pkgs_rpc_coordinator_coordinator_proto_depIdxs = []int32{ + 0, // 0: corrpc.Coordinator.GetHubConfig:input_type -> rpc.Request + 0, // 1: corrpc.Coordinator.GetHubs:input_type -> rpc.Request + 0, // 2: corrpc.Coordinator.GetHubConnectivities:input_type -> rpc.Request + 0, // 3: corrpc.Coordinator.GetStorageDetails:input_type -> rpc.Request + 1, // 4: corrpc.Coordinator.GetHubConfig:output_type -> rpc.Response + 1, // 5: corrpc.Coordinator.GetHubs:output_type -> rpc.Response + 1, // 6: corrpc.Coordinator.GetHubConnectivities:output_type -> rpc.Response + 1, // 7: corrpc.Coordinator.GetStorageDetails:output_type -> rpc.Response + 4, // [4:8] is the sub-list for method output_type + 0, // [0:4] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkgs_rpc_coordinator_coordinator_proto_init() } +func file_pkgs_rpc_coordinator_coordinator_proto_init() { + if File_pkgs_rpc_coordinator_coordinator_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkgs_rpc_coordinator_coordinator_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkgs_rpc_coordinator_coordinator_proto_goTypes, + DependencyIndexes: file_pkgs_rpc_coordinator_coordinator_proto_depIdxs, + }.Build() + File_pkgs_rpc_coordinator_coordinator_proto = out.File + file_pkgs_rpc_coordinator_coordinator_proto_rawDesc = nil + file_pkgs_rpc_coordinator_coordinator_proto_goTypes = nil + file_pkgs_rpc_coordinator_coordinator_proto_depIdxs = nil +} diff --git a/common/pkgs/rpc/coordinator/coordinator.proto b/common/pkgs/rpc/coordinator/coordinator.proto new file mode 100644 index 0000000..1a3d06f --- /dev/null +++ b/common/pkgs/rpc/coordinator/coordinator.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +import "pkgs/rpc/rpc.proto"; + +package corrpc; + +option go_package = "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/corrpc;corrpc"; + + +service Coordinator { + rpc GetHubConfig(rpc.Request) returns(rpc.Response); + rpc GetHubs(rpc.Request) returns(rpc.Response); + rpc GetHubConnectivities(rpc.Request) returns(rpc.Response); + + rpc GetStorageDetails(rpc.Request) returns(rpc.Response); +} \ No newline at end of file diff --git a/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go b/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go new file mode 100644 index 0000000..14044cf --- /dev/null +++ b/common/pkgs/rpc/coordinator/coordinator_grpc.pb.go @@ -0,0 +1,221 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.22.3 +// source: pkgs/rpc/coordinator/coordinator.proto + +package corrpc + +import ( + context "context" + rpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Coordinator_GetHubConfig_FullMethodName = "/corrpc.Coordinator/GetHubConfig" + Coordinator_GetHubs_FullMethodName = "/corrpc.Coordinator/GetHubs" + Coordinator_GetHubConnectivities_FullMethodName = "/corrpc.Coordinator/GetHubConnectivities" + Coordinator_GetStorageDetails_FullMethodName = "/corrpc.Coordinator/GetStorageDetails" +) + +// CoordinatorClient is the client API for Coordinator service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CoordinatorClient interface { + GetHubConfig(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) + GetHubs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) + GetHubConnectivities(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) + GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) +} + +type coordinatorClient struct { + cc grpc.ClientConnInterface +} + +func NewCoordinatorClient(cc grpc.ClientConnInterface) CoordinatorClient { + return &coordinatorClient{cc} +} + +func (c *coordinatorClient) GetHubConfig(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Coordinator_GetHubConfig_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *coordinatorClient) GetHubs(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Coordinator_GetHubs_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *coordinatorClient) GetHubConnectivities(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Coordinator_GetHubConnectivities_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *coordinatorClient) GetStorageDetails(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Coordinator_GetStorageDetails_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CoordinatorServer is the server API for Coordinator service. +// All implementations must embed UnimplementedCoordinatorServer +// for forward compatibility +type CoordinatorServer interface { + GetHubConfig(context.Context, *rpc.Request) (*rpc.Response, error) + GetHubs(context.Context, *rpc.Request) (*rpc.Response, error) + GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) + GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) + mustEmbedUnimplementedCoordinatorServer() +} + +// UnimplementedCoordinatorServer must be embedded to have forward compatible implementations. +type UnimplementedCoordinatorServer struct { +} + +func (UnimplementedCoordinatorServer) GetHubConfig(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetHubConfig not implemented") +} +func (UnimplementedCoordinatorServer) GetHubs(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetHubs not implemented") +} +func (UnimplementedCoordinatorServer) GetHubConnectivities(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetHubConnectivities not implemented") +} +func (UnimplementedCoordinatorServer) GetStorageDetails(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetStorageDetails not implemented") +} +func (UnimplementedCoordinatorServer) mustEmbedUnimplementedCoordinatorServer() {} + +// UnsafeCoordinatorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CoordinatorServer will +// result in compilation errors. +type UnsafeCoordinatorServer interface { + mustEmbedUnimplementedCoordinatorServer() +} + +func RegisterCoordinatorServer(s grpc.ServiceRegistrar, srv CoordinatorServer) { + s.RegisterService(&Coordinator_ServiceDesc, srv) +} + +func _Coordinator_GetHubConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CoordinatorServer).GetHubConfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Coordinator_GetHubConfig_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CoordinatorServer).GetHubConfig(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + +func _Coordinator_GetHubs_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CoordinatorServer).GetHubs(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Coordinator_GetHubs_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CoordinatorServer).GetHubs(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + +func _Coordinator_GetHubConnectivities_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CoordinatorServer).GetHubConnectivities(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Coordinator_GetHubConnectivities_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CoordinatorServer).GetHubConnectivities(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + +func _Coordinator_GetStorageDetails_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CoordinatorServer).GetStorageDetails(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Coordinator_GetStorageDetails_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CoordinatorServer).GetStorageDetails(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + +// Coordinator_ServiceDesc is the grpc.ServiceDesc for Coordinator service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Coordinator_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "corrpc.Coordinator", + HandlerType: (*CoordinatorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetHubConfig", + Handler: _Coordinator_GetHubConfig_Handler, + }, + { + MethodName: "GetHubs", + Handler: _Coordinator_GetHubs_Handler, + }, + { + MethodName: "GetHubConnectivities", + Handler: _Coordinator_GetHubConnectivities_Handler, + }, + { + MethodName: "GetStorageDetails", + Handler: _Coordinator_GetStorageDetails_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkgs/rpc/coordinator/coordinator.proto", +} diff --git a/common/pkgs/mq/coordinator/hub.go b/common/pkgs/rpc/coordinator/hub.go similarity index 51% rename from common/pkgs/mq/coordinator/hub.go rename to common/pkgs/rpc/coordinator/hub.go index 0a0bc14..0254bcf 100644 --- a/common/pkgs/mq/coordinator/hub.go +++ b/common/pkgs/rpc/coordinator/hub.go @@ -1,26 +1,24 @@ -package coordinator +package corrpc import ( - "gitlink.org.cn/cloudream/common/pkgs/mq" + context "context" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) type HubService interface { - GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage) + GetHubConfig(ctx context.Context, msg *GetHubConfig) (*GetHubConfigResp, *rpc.CodeError) - GetHubs(msg *GetHubs) (*GetHubsResp, *mq.CodeMessage) + GetHubs(ctx context.Context, msg *GetHubs) (*GetHubsResp, *rpc.CodeError) - GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *mq.CodeMessage) + GetHubConnectivities(ctx context.Context, msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *rpc.CodeError) } -var _ = Register(Service.GetHubConfig) - type GetHubConfig struct { - mq.MessageBodyBase HubID cortypes.HubID `json:"hubID"` } type GetHubConfigResp struct { - mq.MessageBodyBase Hub cortypes.Hub `json:"hub"` } @@ -34,19 +32,18 @@ func RespGetHubConfig(hub cortypes.Hub) *GetHubConfigResp { Hub: hub, } } -func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) { - return mq.Request(Service.GetHubConfig, client.rabbitCli, msg) +func (c *Client) GetHubConfig(ctx context.Context, msg *GetHubConfig) (*GetHubConfigResp, *rpc.CodeError) { + return rpc.UnaryClient[*GetHubConfigResp](c.cli.GetHubConfig, ctx, msg) +} +func (s *Server) GetHubConfig(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.GetHubConfig, ctx, req) } // 获取指定节点的信息。如果HubIDs为nil,则返回所有Hub -var _ = Register(Service.GetHubs) - type GetHubs struct { - mq.MessageBodyBase HubIDs []cortypes.HubID `json:"hubIDs"` } type GetHubsResp struct { - mq.MessageBodyBase Hubs []*cortypes.Hub `json:"hubs"` } @@ -69,19 +66,19 @@ func (r *GetHubsResp) GetHub(id cortypes.HubID) *cortypes.Hub { return nil } -func (client *Client) GetHubs(msg *GetHubs) (*GetHubsResp, error) { - return mq.Request(Service.GetHubs, client.rabbitCli, msg) +func (c *Client) GetHubs(ctx context.Context, msg *GetHubs) (*GetHubsResp, *rpc.CodeError) { + return rpc.UnaryClient[*GetHubsResp](c.cli.GetHubs, ctx, msg) +} +func (s *Server) GetHubs(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.GetHubs, ctx, req) } // 获取节点连通性信息 -var _ = Register(Service.GetHubConnectivities) type GetHubConnectivities struct { - mq.MessageBodyBase HubIDs []cortypes.HubID `json:"hubIDs"` } type GetHubConnectivitiesResp struct { - mq.MessageBodyBase Connectivities []cortypes.HubConnectivity `json:"hubs"` } @@ -95,6 +92,9 @@ func RespGetHubConnectivities(cons []cortypes.HubConnectivity) *GetHubConnectivi Connectivities: cons, } } -func (client *Client) GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, error) { - return mq.Request(Service.GetHubConnectivities, client.rabbitCli, msg) +func (c *Client) GetHubConnectivities(ctx context.Context, msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *rpc.CodeError) { + return rpc.UnaryClient[*GetHubConnectivitiesResp](c.cli.GetHubConnectivities, ctx, msg) +} +func (s *Server) GetHubConnectivities(ctx context.Context, req *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.GetHubConnectivities, ctx, req) } diff --git a/common/pkgs/rpc/coordinator/pool.go b/common/pkgs/rpc/coordinator/pool.go new file mode 100644 index 0000000..28fbbd1 --- /dev/null +++ b/common/pkgs/rpc/coordinator/pool.go @@ -0,0 +1,96 @@ +package corrpc + +import ( + "sync" + "time" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type PoolConfig struct { + Address string `json:"address"` +} + +type Pool struct { + cfg PoolConfig + grpcCon *grpcCon + lock sync.Mutex +} + +type grpcCon struct { + grpcCon *grpc.ClientConn + refCount int + stopClosing chan any +} + +func NewPool(cfg PoolConfig) *Pool { + return &Pool{ + cfg: cfg, + } +} + +func (p *Pool) Get() *Client { + p.lock.Lock() + defer p.lock.Unlock() + + con := p.grpcCon + if con == nil { + gcon, err := grpc.NewClient(p.cfg.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return &Client{ + con: nil, + pool: p, + fusedErr: rpc.Failed(errorcode.OperationFailed, err.Error()), + } + } + + con = &grpcCon{ + grpcCon: gcon, + refCount: 0, + stopClosing: nil, + } + + p.grpcCon = con + } + + con.refCount++ + + return &Client{ + con: con.grpcCon, + cli: NewCoordinatorClient(con.grpcCon), + pool: p, + } +} + +func (p *Pool) release() { + p.lock.Lock() + defer p.lock.Unlock() + + grpcCon := p.grpcCon + grpcCon.refCount-- + grpcCon.refCount = max(grpcCon.refCount, 0) + + if grpcCon.refCount == 0 { + stopClosing := make(chan any) + grpcCon.stopClosing = stopClosing + + go func() { + select { + case <-stopClosing: + return + + case <-time.After(time.Minute): + p.lock.Lock() + defer p.lock.Unlock() + + if grpcCon.refCount == 0 { + grpcCon.grpcCon.Close() + p.grpcCon = nil + } + } + }() + } +} diff --git a/common/pkgs/rpc/coordinator/server.go b/common/pkgs/rpc/coordinator/server.go new file mode 100644 index 0000000..d4276ad --- /dev/null +++ b/common/pkgs/rpc/coordinator/server.go @@ -0,0 +1,26 @@ +package corrpc + +import ( + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" +) + +type CoordinatorAPI interface { + HubService + StorageService +} + +type Server struct { + UnimplementedCoordinatorServer + *rpc.ServerBase + svrImpl CoordinatorAPI +} + +func NewServer(cfg rpc.Config, impl CoordinatorAPI) *Server { + svr := &Server{ + svrImpl: impl, + } + svr.ServerBase = rpc.NewServerBase(cfg, svr, &Coordinator_ServiceDesc) + return svr +} + +var _ CoordinatorServer = (*Server)(nil) diff --git a/common/pkgs/rpc/coordinator/storage.go b/common/pkgs/rpc/coordinator/storage.go new file mode 100644 index 0000000..e623648 --- /dev/null +++ b/common/pkgs/rpc/coordinator/storage.go @@ -0,0 +1,41 @@ +package corrpc + +import ( + context "context" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" +) + +type StorageService interface { + GetStorageDetails(ctx context.Context, msg *GetStorageDetails) (*GetStorageDetailsResp, *rpc.CodeError) +} + +// 获取Storage信息 + +type GetStorageDetails struct { + StorageIDs []cortypes.StorageID `json:"storageIDs"` +} +type GetStorageDetailsResp struct { + Storage []*cortypes.StorageDetail `json:"storages"` +} + +func ReqGetStorageDetails(storageIDs []cortypes.StorageID) *GetStorageDetails { + return &GetStorageDetails{ + StorageIDs: storageIDs, + } +} +func RespGetStorageDetails(stgs []*cortypes.StorageDetail) *GetStorageDetailsResp { + return &GetStorageDetailsResp{ + Storage: stgs, + } +} +func (c *Client) GetStorageDetails(ctx context.Context, msg *GetStorageDetails) (*GetStorageDetailsResp, *rpc.CodeError) { + if c.fusedErr != nil { + return nil, c.fusedErr + } + return rpc.UnaryClient[*GetStorageDetailsResp](c.cli.GetStorageDetails, ctx, msg) +} +func (s *Server) GetStorageDetails(ctx context.Context, msg *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.GetStorageDetails, ctx, msg) +} diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index e03aa22..7b7d5e4 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -6,14 +6,14 @@ import ( "github.com/spf13/cobra" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/config" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" - mymq "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/mq" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/repl" + myrpc "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/rpc" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock" ) @@ -44,8 +44,7 @@ func serve(configPath string) { os.Exit(1) } - stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(hubrpc.PoolConfig{}) + stgglb.InitPools(&hubrpc.PoolConfig{}, nil) db2, err := db.NewDB(&config.Cfg().DB) if err != nil { @@ -60,17 +59,10 @@ func serve(configPath string) { // } // go servePublisher(evtPub) - coorSvr, err := coormq.NewServer(mymq.NewService(db2), config.Cfg().RabbitMQ) - if err != nil { - logger.Fatalf("new coordinator server failed, err: %s", err.Error()) - } - - coorSvr.OnError(func(err error) { - logger.Warnf("coordinator server err: %s", err.Error()) - }) - - // 启动服务 - go serveCoorServer(coorSvr, config.Cfg().RabbitMQ) + // RPC服务 + rpcSvr := corrpc.NewServer(config.Cfg().RPC, myrpc.NewService(db2)) + rpcSvrChan := rpcSvr.Start() + defer rpcSvr.Stop() // 定时任务 tktk := ticktock.New(config.Cfg().TickTock, db2) @@ -83,6 +75,7 @@ func serve(configPath string) { /// 开始监听各个模块的事件 replEvt := replCh.Receive() + rpcEvt := rpcSvrChan.Receive() loop: for { select { @@ -98,6 +91,23 @@ loop: break loop } replEvt = replCh.Receive() + + case e := <-rpcEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive rpc event: %v", e.Err) + break loop + } + + switch e := e.Value.(type) { + case rpc.ExitEvent: + if e.Err != nil { + logger.Errorf("rpc server exited with error: %v", e.Err) + } else { + logger.Infof("rpc server exited") + } + break loop + } + rpcEvt = rpcSvrChan.Receive() } } } @@ -136,38 +146,3 @@ loop: // // TODO 仅简单结束了程序 // os.Exit(1) // } - -func serveCoorServer(server *coormq.Server, cfg mq.Config) { - logger.Info("start serving command server") - - ch := server.Start(cfg) - if ch == nil { - logger.Errorf("RabbitMQ logEvent is nil") - os.Exit(1) - } - -loop: - for { - val, err := ch.Receive() - if err != nil { - logger.Errorf("command server stopped with error: %s", err.Error()) - break - } - - switch val := val.(type) { - case error: - logger.Errorf("rabbitmq connect with error: %v", val) - case mq.ServerExit: - if val.Error != nil { - logger.Errorf("rabbitmq server exit with error: %v", val.Error) - } else { - logger.Info("rabbitmq server exit") - } - break loop - } - } - logger.Info("command server stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} diff --git a/coordinator/internal/config/config.go b/coordinator/internal/config/config.go index 678ced6..c800308 100644 --- a/coordinator/internal/config/config.go +++ b/coordinator/internal/config/config.go @@ -4,6 +4,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" c "gitlink.org.cn/cloudream/common/utils/config" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/ticktock" ) @@ -13,6 +14,7 @@ type Config struct { DB db.Config `json:"db"` RabbitMQ mq.Config `json:"rabbitMQ"` TickTock ticktock.Config `json:"tickTock"` + RPC rpc.Config `json:"rpc"` } var cfg Config diff --git a/coordinator/internal/mq/hub.go b/coordinator/internal/rpc/hub.go similarity index 54% rename from coordinator/internal/mq/hub.go rename to coordinator/internal/rpc/hub.go index 5ddd286..8bbec3e 100644 --- a/coordinator/internal/mq/hub.go +++ b/coordinator/internal/rpc/hub.go @@ -1,35 +1,36 @@ -package mq +package rpc import ( + "context" "fmt" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) -func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) { +func (svc *Service) GetHubConfig(ctx context.Context, msg *corrpc.GetHubConfig) (*corrpc.GetHubConfigResp, *rpc.CodeError) { log := logger.WithField("HubID", msg.HubID) hub, err := svc.db.Hub().GetByID(svc.db.DefCtx(), msg.HubID) if err != nil { log.Warnf("getting hub: %v", err) - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err)) + return nil, rpc.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err)) } - return mq.ReplyOK(coormq.RespGetHubConfig(hub)) + return corrpc.RespGetHubConfig(hub), nil } -func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) { +func (svc *Service) GetHubs(ctx context.Context, msg *corrpc.GetHubs) (*corrpc.GetHubsResp, *rpc.CodeError) { var hubs []*cortypes.Hub if msg.HubIDs == nil { get, err := svc.db.Hub().GetAllHubs(svc.db.DefCtx()) if err != nil { logger.Warnf("getting all hubs: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get all hub failed") + return nil, rpc.Failed(errorcode.OperationFailed, "get all hub failed") } for _, hub := range get { h := hub @@ -41,7 +42,7 @@ func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeM get, err := svc.db.Hub().BatchGetByID(svc.db.DefCtx(), msg.HubIDs) if err != nil { logger.Warnf("batch get hubs by id: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("batch get hubs by id: %v", err)) + return nil, rpc.Failed(errorcode.OperationFailed, fmt.Sprintf("batch get hubs by id: %v", err)) } getMp := make(map[cortypes.HubID]cortypes.Hub) @@ -59,15 +60,15 @@ func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeM } } - return mq.ReplyOK(coormq.NewGetHubsResp(hubs)) + return corrpc.NewGetHubsResp(hubs), nil } -func (svc *Service) GetHubConnectivities(msg *coormq.GetHubConnectivities) (*coormq.GetHubConnectivitiesResp, *mq.CodeMessage) { +func (svc *Service) GetHubConnectivities(ctx context.Context, msg *corrpc.GetHubConnectivities) (*corrpc.GetHubConnectivitiesResp, *rpc.CodeError) { cons, err := svc.db.HubConnectivity().BatchGetByFromHub(svc.db.DefCtx(), msg.HubIDs) if err != nil { logger.Warnf("batch get hub connectivities by from hub: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "batch get hub connectivities by from hub failed") + return nil, rpc.Failed(errorcode.OperationFailed, "batch get hub connectivities by from hub failed") } - return mq.ReplyOK(coormq.RespGetHubConnectivities(cons)) + return corrpc.RespGetHubConnectivities(cons), nil } diff --git a/coordinator/internal/mq/service.go b/coordinator/internal/rpc/service.go similarity index 93% rename from coordinator/internal/mq/service.go rename to coordinator/internal/rpc/service.go index e562299..03ecdee 100644 --- a/coordinator/internal/mq/service.go +++ b/coordinator/internal/rpc/service.go @@ -1,4 +1,4 @@ -package mq +package rpc import ( "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/rpc/storage.go similarity index 76% rename from coordinator/internal/mq/storage.go rename to coordinator/internal/rpc/storage.go index 06a97e6..9aa6c79 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/rpc/storage.go @@ -1,18 +1,19 @@ -package mq +package rpc import ( + "context" "fmt" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" "gitlink.org.cn/cloudream/jcs-pub/coordinator/internal/db" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) -func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.GetStorageDetailsResp, *mq.CodeMessage) { +func (svc *Service) GetStorageDetails(ctx context.Context, msg *corrpc.GetStorageDetails) (*corrpc.GetStorageDetailsResp, *rpc.CodeError) { d := svc.db stgs, err := db.DoTx02(d, func(tx db.SQLContext) ([]*cortypes.StorageDetail, error) { stgs, err := d.Storage().BatchGetByID(tx, msg.StorageIDs) @@ -59,8 +60,8 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge }) if err != nil { logger.Warnf("getting storage details: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting storage details: %v", err)) + return nil, rpc.Failed(errorcode.OperationFailed, fmt.Sprintf("getting storage details: %v", err)) } - return mq.ReplyOK(coormq.RespGetStorageDetails(stgs)) + return corrpc.RespGetStorageDetails(stgs), nil } diff --git a/coordinator/internal/ticktock/check_hub_state.go b/coordinator/internal/ticktock/check_hub_state.go index 6743cbf..b2ad2db 100644 --- a/coordinator/internal/ticktock/check_hub_state.go +++ b/coordinator/internal/ticktock/check_hub_state.go @@ -73,7 +73,7 @@ func (j *CheckHubState) checkOne(t *TickTock, hub cortypes.Hub) error { } } - return fmt.Errorf("getting state: %w", cerr) + return fmt.Errorf("getting state: %w", cerr.ToError()) } // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal diff --git a/hub/internal/cmd/serve.go b/hub/internal/cmd/serve.go index 98923d6..9382f54 100644 --- a/hub/internal/cmd/serve.go +++ b/hub/internal/cmd/serve.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "time" "github.com/go-co-op/gocron/v2" "github.com/spf13/cobra" @@ -21,7 +22,7 @@ import ( cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" "gitlink.org.cn/cloudream/jcs-pub/hub/internal/config" - coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/coordinator" + coormq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" ) func init() { @@ -54,8 +55,7 @@ func serve(configPath string, httpAddr string) { } stgglb.InitLocal(config.Cfg().Local) - stgglb.InitMQPool(config.Cfg().RabbitMQ) - stgglb.InitHubRPCPool(hubrpc.PoolConfig{}) + stgglb.InitPools(&hubrpc.PoolConfig{}, &config.Cfg().CoordinatorRPC) // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 获取Hub配置 @@ -172,16 +172,15 @@ loop: } func downloadHubConfig() coormq.GetHubConfigResp { - coorCli, err := stgglb.CoordinatorMQPool.Acquire() - if err != nil { - logger.Errorf("new coordinator client: %v", err) - os.Exit(1) - } - defer stgglb.CoordinatorMQPool.Release(coorCli) + coorCli := stgglb.CoordinatorRPCPool.Get() + defer coorCli.Release() - cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cortypes.HubID(config.Cfg().ID))) - if err != nil { - logger.Errorf("getting hub config: %v", err) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + cfgResp, cerr := coorCli.GetHubConfig(ctx, coormq.ReqGetHubConfig(cortypes.HubID(config.Cfg().ID))) + if cerr != nil { + logger.Errorf("getting hub config: %v", cerr) os.Exit(1) } diff --git a/hub/internal/config/config.go b/hub/internal/config/config.go index 0ffccb4..e72a354 100644 --- a/hub/internal/config/config.go +++ b/hub/internal/config/config.go @@ -7,16 +7,18 @@ import ( stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" cortypes "gitlink.org.cn/cloudream/jcs-pub/coordinator/types" ) type Config struct { - ID cortypes.HubID `json:"id"` - Local stgglb.LocalMachineInfo `json:"local"` - RPC rpc.Config `json:"rpc"` - Logger log.Config `json:"logger"` - RabbitMQ mq.Config `json:"rabbitMQ"` - Connectivity connectivity.Config `json:"connectivity"` + ID cortypes.HubID `json:"id"` + Local stgglb.LocalMachineInfo `json:"local"` + RPC rpc.Config `json:"rpc"` + CoordinatorRPC corrpc.PoolConfig `json:"coordinatorRPC"` + Logger log.Config `json:"logger"` + RabbitMQ mq.Config `json:"rabbitMQ"` + Connectivity connectivity.Config `json:"connectivity"` } var cfg Config