diff --git a/client/internal/cluster/cluster.go b/client/internal/cluster/cluster.go new file mode 100644 index 0000000..493259e --- /dev/null +++ b/client/internal/cluster/cluster.go @@ -0,0 +1,78 @@ +package cluster + +import ( + "context" + "time" + + "gitlink.org.cn/cloudream/common/pkgs/logger" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" +) + +type Cluster struct { + cfg Config + cliPool *clirpc.Pool +} + +func New(cfg *Config) *Cluster { + c := Config{} + if cfg != nil { + c = *cfg + } else { + c.IsMaster = true + } + + return &Cluster{ + cfg: c, + } +} + +func (c *Cluster) Start() error { + log := logger.WithField("Mod", "Cluster") + + if c.cfg.IsMaster { + log.Infof("cluster start as master") + return nil + } + + poolCfgJSON := clirpc.PoolConfigJSON{ + Address: c.cfg.MasterAddress, + RootCA: c.cfg.RootCA, + ClientCert: c.cfg.ClientCert, + ClientKey: c.cfg.ClientKey, + } + + poolCfg, err := poolCfgJSON.Build() + if err != nil { + return err + } + + c.cliPool = clirpc.NewPool(*poolCfg) + for { + cli := c.cliPool.Get() + + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + resp, cerr := cli.GetClusterMasterInfo(ctx, &clirpc.GetClusterMasterInfo{}) + cancelFn() + if cerr != nil { + log.Warnf("first report: %v, will retry after 3 seconds", err) + time.Sleep(3 * time.Second) + continue + } + + log.Infof("cluster start as slave, master is: %v", resp.Name) + break + } + return nil +} + +func (c *Cluster) IsMaster() bool { + return c.cfg.IsMaster +} + +func (c *Cluster) Name() string { + return c.cfg.Name +} + +func (c *Cluster) MasterClient() *clirpc.Pool { + return c.cliPool +} diff --git a/client/internal/cluster/config.go b/client/internal/cluster/config.go new file mode 100644 index 0000000..b5dfd2c --- /dev/null +++ b/client/internal/cluster/config.go @@ -0,0 +1,10 @@ +package cluster + +type Config struct { + IsMaster bool `json:"isMaster"` + Name string `json:"name"` + MasterAddress string `json:"masterAddress"` + RootCA string `json:"rootCA"` + ClientCert string `json:"clientCert"` + ClientKey string `json:"clientKey"` +} diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 8e4139a..9f6a6bd 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" @@ -76,6 +77,14 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { stgglb.InitLocal(config.Cfg().Local) + // 集群模式 + clster := cluster.New(config.Cfg().Cluster) + err = clster.Start() + if err != nil { + logger.Errorf("start cluster failed, err: %v", err) + os.Exit(1) + } + // 数据库 db, err := db.NewDB(&config.Cfg().DB) if err != nil { @@ -195,12 +204,12 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { uploader := uploader.NewUploader(publock, conCol, stgPool, spaceMeta, db) // 定时任务 - tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock, spdStats) + tktk := ticktock.New(config.Cfg().TickTock, db, spaceMeta, stgPool, evtPub, publock, spdStats, clster) tktk.Start() defer tktk.Stop() // 用户空间同步功能 - spaceSync := spacesyncer.New(db, stgPool, spaceMeta) + spaceSync := spacesyncer.New(db, stgPool, spaceMeta, clster) spaceSyncChan := spaceSync.Start() defer spaceSync.Stop() @@ -244,7 +253,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { defer httpSvr.Stop() // RPC接口 - rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock), nil) + rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock, clster), nil) rpcChan := rpcSvr.Start() defer rpcSvr.Stop() diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index bb5707b..31447b5 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" @@ -87,6 +88,14 @@ func test(configPath string) { stgglb.StandaloneMode = config.Cfg().AccessToken == nil + // 集群模式 + clster := cluster.New(config.Cfg().Cluster) + err = clster.Start() + if err != nil { + logger.Errorf("start cluster failed, err: %v", err) + os.Exit(1) + } + // 数据库 db, err := db.NewDB(&config.Cfg().DB) if err != nil { @@ -203,7 +212,7 @@ func test(configPath string) { uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) // 用户空间同步功能 - spaceSync := spacesyncer.New(db, stgPool, stgMeta) + spaceSync := spacesyncer.New(db, stgPool, stgMeta, clster) spaceSyncChan := spaceSync.Start() defer spaceSync.Stop() diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 93589e3..238c89f 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accessstat" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/config" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" @@ -67,6 +68,14 @@ func vfsTest(configPath string, opts serveHTTPOptions) { stgglb.StandaloneMode = opts.Standalone || config.Cfg().AccessToken == nil + // 集群模式 + clster := cluster.New(config.Cfg().Cluster) + err = clster.Start() + if err != nil { + logger.Errorf("start cluster failed, err: %v", err) + os.Exit(1) + } + // 数据库 db, err := db.NewDB(&config.Cfg().DB) if err != nil { @@ -183,7 +192,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { uploader := uploader.NewUploader(publock, conCol, stgPool, stgMeta, db) // 用户空间同步功能 - spaceSync := spacesyncer.New(db, stgPool, stgMeta) + spaceSync := spacesyncer.New(db, stgPool, stgMeta, clster) spaceSyncChan := spaceSync.Start() defer spaceSync.Stop() diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 38727d0..3f1e7bc 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -4,6 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/jcs-pub/client/internal/accesstoken" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" @@ -33,6 +34,7 @@ type Config struct { RPC rpc.Config `json:"rpc"` Mount *mntcfg.Config `json:"mount"` AccessToken *accesstoken.Config `json:"accessToken"` + Cluster *cluster.Config `json:"cluster"` } var cfg Config diff --git a/client/internal/rpc/cluster.go b/client/internal/rpc/cluster.go new file mode 100644 index 0000000..abce8d9 --- /dev/null +++ b/client/internal/rpc/cluster.go @@ -0,0 +1,12 @@ +package rpc + +import ( + "context" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" +) + +func (svc *Service) GetClusterMasterInfo(ctx context.Context, msg *clirpc.GetClusterMasterInfo) (*clirpc.GetClusterMasterInfoResp, *rpc.CodeError) { + return &clirpc.GetClusterMasterInfoResp{Name: svc.cluster.Name()}, nil +} diff --git a/client/internal/rpc/rpc.go b/client/internal/rpc/rpc.go index 110779e..280b9f3 100644 --- a/client/internal/rpc/rpc.go +++ b/client/internal/rpc/rpc.go @@ -1,17 +1,20 @@ package rpc import ( + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" ) type Service struct { pubLock *publock.PubLock + cluster *cluster.Cluster } -func NewService(pubLock *publock.PubLock) *Service { +func NewService(pubLock *publock.PubLock, cluster *cluster.Cluster) *Service { return &Service{ pubLock: pubLock, + cluster: cluster, } } diff --git a/client/internal/services/user_space.go b/client/internal/services/user_space.go index 8be221d..2ced987 100644 --- a/client/internal/services/user_space.go +++ b/client/internal/services/user_space.go @@ -111,7 +111,7 @@ func (svc *UserSpaceService) Update(req cliapi.UserSpaceUpdate) (*cliapi.UserSpa // 通知元数据缓存无效 svc.UserSpaceMeta.Drop([]jcstypes.UserSpaceID{req.UserSpaceID}) - // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理 + // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理。还有集群模式 svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID) // TODO 考虑加锁再进行操作 diff --git a/client/internal/spacesyncer/space_syncer.go b/client/internal/spacesyncer/space_syncer.go index 1b5b52f..252ee3b 100644 --- a/client/internal/spacesyncer/space_syncer.go +++ b/client/internal/spacesyncer/space_syncer.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" stgpool "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" @@ -25,15 +26,17 @@ type SpaceSyncer struct { db *db.DB stgPool *stgpool.Pool spaceMeta *metacache.UserSpaceMeta + cluster *cluster.Cluster lock sync.Mutex tasks map[jcstypes.SpaceSyncTaskID]*task } -func New(db *db.DB, stgPool *stgpool.Pool, spaceMeta *metacache.UserSpaceMeta) *SpaceSyncer { +func New(db *db.DB, stgPool *stgpool.Pool, spaceMeta *metacache.UserSpaceMeta, cluster *cluster.Cluster) *SpaceSyncer { return &SpaceSyncer{ db: db, stgPool: stgPool, spaceMeta: spaceMeta, + cluster: cluster, tasks: make(map[jcstypes.SpaceSyncTaskID]*task), } } @@ -46,6 +49,11 @@ func (s *SpaceSyncer) Start() *async.UnboundChannel[SpaceSyncerEvent] { ch := async.NewUnboundChannel[SpaceSyncerEvent]() + if !s.cluster.IsMaster() { + log.Infof("not master, skip start space syncer") + return ch + } + allTask, err := db.DoTx01(s.db, s.db.SpaceSyncTask().GetAll) if err != nil { log.Warnf("load task from db: %v", err) @@ -91,6 +99,10 @@ func (s *SpaceSyncer) Stop() { s.lock.Lock() defer s.lock.Unlock() + if !s.cluster.IsMaster() { + return + } + for _, t := range s.tasks { t.CancelFn() } @@ -101,6 +113,10 @@ func (s *SpaceSyncer) Stop() { func (s *SpaceSyncer) CreateTask(t jcstypes.SpaceSyncTask) (*TaskInfo, error) { log := logger.WithField("Mod", logMod) + if !s.cluster.IsMaster() { + return nil, fmt.Errorf("not master, create task aborted") + } + d := s.db err := d.DoTx(func(tx db.SQLContext) error { err := d.SpaceSyncTask().Create(tx, &t) @@ -146,6 +162,10 @@ func (s *SpaceSyncer) CreateTask(t jcstypes.SpaceSyncTask) (*TaskInfo, error) { func (s *SpaceSyncer) CancelTask(taskID jcstypes.SpaceSyncTaskID) { log := logger.WithField("Mod", logMod) + if !s.cluster.IsMaster() { + return + } + s.lock.Lock() defer s.lock.Unlock() @@ -170,6 +190,10 @@ func (s *SpaceSyncer) GetTask(taskID jcstypes.SpaceSyncTaskID) *jcstypes.SpaceSy s.lock.Lock() defer s.lock.Unlock() + if !s.cluster.IsMaster() { + return nil + } + tsk := s.tasks[taskID] if tsk == nil { return nil diff --git a/client/internal/ticktock/change_redundancy.go b/client/internal/ticktock/change_redundancy.go index bc7556e..763e4c9 100644 --- a/client/internal/ticktock/change_redundancy.go +++ b/client/internal/ticktock/change_redundancy.go @@ -32,6 +32,11 @@ func (j *ChangeRedundancy) Execute(t *TickTock) { log.Infof("job end, time: %v", time.Since(startTime)) }() + if !t.cluster.IsMaster() { + log.Infof("not master, skip") + return + } + ctx := &changeRedundancyContext{ ticktock: t, allUserSpaces: make(map[jcstypes.UserSpaceID]*userSpaceUsageInfo), diff --git a/client/internal/ticktock/check_shardstore.go b/client/internal/ticktock/check_shardstore.go index 91a6197..9ca5f9c 100644 --- a/client/internal/ticktock/check_shardstore.go +++ b/client/internal/ticktock/check_shardstore.go @@ -30,6 +30,11 @@ func (j *CheckShardStore) Execute(t *TickTock) { log.Infof("job end, time: %v", time.Since(startTime)) }() + if !t.cluster.IsMaster() { + log.Infof("not master, skip") + return + } + db2 := t.db spaceIDs, err := db2.UserSpace().GetAllIDs(db2.DefCtx()) diff --git a/client/internal/ticktock/ticktock.go b/client/internal/ticktock/ticktock.go index a081432..bde3547 100644 --- a/client/internal/ticktock/ticktock.go +++ b/client/internal/ticktock/ticktock.go @@ -6,6 +6,7 @@ import ( "github.com/go-co-op/gocron/v2" "github.com/samber/lo" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" @@ -34,9 +35,10 @@ type TickTock struct { evtPub *sysevent.Publisher pubLock *publock.PubLock speedStats *speedstats.SpeedStats + cluster *cluster.Cluster } -func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.PubLock, speedStats *speedstats.SpeedStats) *TickTock { +func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *pool.Pool, evtPub *sysevent.Publisher, pubLock *publock.PubLock, speedStats *speedstats.SpeedStats, cluster *cluster.Cluster) *TickTock { sch, _ := gocron.NewScheduler() t := &TickTock{ cfg: cfg, @@ -48,6 +50,7 @@ func New(cfg Config, db *db.DB, spaceMeta *metacache.UserSpaceMeta, stgPool *poo evtPub: evtPub, pubLock: pubLock, speedStats: speedStats, + cluster: cluster, } t.initJobs() return t diff --git a/client/internal/ticktock/update_package_access_stat_amount.go b/client/internal/ticktock/update_package_access_stat_amount.go index 01f86e8..17475e1 100644 --- a/client/internal/ticktock/update_package_access_stat_amount.go +++ b/client/internal/ticktock/update_package_access_stat_amount.go @@ -22,6 +22,11 @@ func (j *UpdatePackageAccessStatAmount) Execute(t *TickTock) { log.Infof("job end, time: %v", time.Since(startTime)) }() + if !t.cluster.IsMaster() { + log.Infof("not master, skip") + return + } + err := t.db.PackageAccessStat().UpdateAllAmount(t.db.DefCtx(), t.cfg.AccessStatHistoryWeight) if err != nil { log.Warnf("update all package access stat amount: %v", err) diff --git a/client/internal/ticktock/user_space_gc.go b/client/internal/ticktock/user_space_gc.go index 330b369..8790b89 100644 --- a/client/internal/ticktock/user_space_gc.go +++ b/client/internal/ticktock/user_space_gc.go @@ -26,6 +26,11 @@ func (j *UserSpaceGC) Execute(t *TickTock) { log.Infof("job end, time: %v", time.Since(startTime)) }() + if !t.cluster.IsMaster() { + log.Infof("not master, skip") + return + } + spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) if err != nil { log.Warnf("getting user space ids: %v", err) diff --git a/common/pkgs/rpc/client/client.pb.go b/common/pkgs/rpc/client/client.pb.go index 5f602b7..d51a97b 100644 --- a/common/pkgs/rpc/client/client.pb.go +++ b/common/pkgs/rpc/client/client.pb.go @@ -26,16 +26,19 @@ var file_pkgs_rpc_client_client_proto_rawDesc = []byte{ 0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, - 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x3b, 0x0a, 0x06, 0x43, 0x6c, + 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x70, 0x0a, 0x06, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, - 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, - 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, - 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x72, - 0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, + 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, + 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, + 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, + 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, + 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, + 0x2f, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var file_pkgs_rpc_client_client_proto_goTypes = []any{ @@ -44,9 +47,11 @@ var file_pkgs_rpc_client_client_proto_goTypes = []any{ } var file_pkgs_rpc_client_client_proto_depIdxs = []int32{ 0, // 0: clirpc.Client.PubLockChannel:input_type -> rpc.Request - 1, // 1: clirpc.Client.PubLockChannel:output_type -> rpc.Response - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type + 0, // 1: clirpc.Client.GetClusterMasterInfo:input_type -> rpc.Request + 1, // 2: clirpc.Client.PubLockChannel:output_type -> rpc.Response + 1, // 3: clirpc.Client.GetClusterMasterInfo:output_type -> rpc.Response + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/common/pkgs/rpc/client/client.proto b/common/pkgs/rpc/client/client.proto index 390b2e2..3091b0d 100644 --- a/common/pkgs/rpc/client/client.proto +++ b/common/pkgs/rpc/client/client.proto @@ -8,4 +8,6 @@ option go_package = "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/clirpc;cli service Client { rpc PubLockChannel(stream rpc.Request) returns(stream rpc.Response); + + rpc GetClusterMasterInfo(rpc.Request) returns(rpc.Response); } \ No newline at end of file diff --git a/common/pkgs/rpc/client/client_grpc.pb.go b/common/pkgs/rpc/client/client_grpc.pb.go index 16f24ea..11bca54 100644 --- a/common/pkgs/rpc/client/client_grpc.pb.go +++ b/common/pkgs/rpc/client/client_grpc.pb.go @@ -20,7 +20,8 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel" + Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel" + Client_GetClusterMasterInfo_FullMethodName = "/clirpc.Client/GetClusterMasterInfo" ) // ClientClient is the client API for Client service. @@ -28,6 +29,7 @@ const ( // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ClientClient interface { PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error) + GetClusterMasterInfo(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) } type clientClient struct { @@ -69,11 +71,21 @@ func (x *clientPubLockChannelClient) Recv() (*rpc.Response, error) { return m, nil } +func (c *clientClient) GetClusterMasterInfo(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) { + out := new(rpc.Response) + err := c.cc.Invoke(ctx, Client_GetClusterMasterInfo_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ClientServer is the server API for Client service. // All implementations must embed UnimplementedClientServer // for forward compatibility type ClientServer interface { PubLockChannel(Client_PubLockChannelServer) error + GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) mustEmbedUnimplementedClientServer() } @@ -84,6 +96,9 @@ type UnimplementedClientServer struct { func (UnimplementedClientServer) PubLockChannel(Client_PubLockChannelServer) error { return status.Errorf(codes.Unimplemented, "method PubLockChannel not implemented") } +func (UnimplementedClientServer) GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetClusterMasterInfo not implemented") +} func (UnimplementedClientServer) mustEmbedUnimplementedClientServer() {} // UnsafeClientServer may be embedded to opt out of forward compatibility for this service. @@ -123,13 +138,36 @@ func (x *clientPubLockChannelServer) Recv() (*rpc.Request, error) { return m, nil } +func _Client_GetClusterMasterInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(rpc.Request) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ClientServer).GetClusterMasterInfo(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Client_GetClusterMasterInfo_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ClientServer).GetClusterMasterInfo(ctx, req.(*rpc.Request)) + } + return interceptor(ctx, in, info, handler) +} + // Client_ServiceDesc is the grpc.ServiceDesc for Client service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var Client_ServiceDesc = grpc.ServiceDesc{ ServiceName: "clirpc.Client", HandlerType: (*ClientServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "GetClusterMasterInfo", + Handler: _Client_GetClusterMasterInfo_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "PubLockChannel", diff --git a/common/pkgs/rpc/client/cluster.go b/common/pkgs/rpc/client/cluster.go new file mode 100644 index 0000000..75198ca --- /dev/null +++ b/common/pkgs/rpc/client/cluster.go @@ -0,0 +1,29 @@ +package clirpc + +import ( + "context" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" +) + +type ClusterService interface { + GetClusterMasterInfo(ctx context.Context, req *GetClusterMasterInfo) (*GetClusterMasterInfoResp, *rpc.CodeError) +} + +type GetClusterMasterInfo struct { +} + +type GetClusterMasterInfoResp struct { + Name string +} + +func (c *Client) GetClusterMasterInfo(ctx context.Context, msg *GetClusterMasterInfo) (*GetClusterMasterInfoResp, *rpc.CodeError) { + if c.fusedErr != nil { + return nil, c.fusedErr + } + + return rpc.UnaryClient[*GetClusterMasterInfoResp](c.cli.GetClusterMasterInfo, ctx, msg) +} +func (s *Server) GetClusterMasterInfo(ctx context.Context, msg *rpc.Request) (*rpc.Response, error) { + return rpc.UnaryServer(s.svrImpl.GetClusterMasterInfo, ctx, msg) +} diff --git a/common/pkgs/rpc/client/pool.go b/common/pkgs/rpc/client/pool.go index 42357f0..ea5f0af 100644 --- a/common/pkgs/rpc/client/pool.go +++ b/common/pkgs/rpc/client/pool.go @@ -24,11 +24,10 @@ type PoolConfigJSON struct { ClientKey string `json:"clientKey"` } -func (c *PoolConfigJSON) Build(tokenProv rpc.AccessTokenProvider) (*PoolConfig, error) { +func (c *PoolConfigJSON) Build() (*PoolConfig, error) { pc := &PoolConfig{ Address: c.Address, } - pc.Conn.AccessTokenProvider = tokenProv rootCA, err := os.ReadFile(c.RootCA) if err != nil { @@ -45,8 +44,6 @@ func (c *PoolConfigJSON) Build(tokenProv rpc.AccessTokenProvider) (*PoolConfig, return nil, fmt.Errorf("load client cert: %v", err) } pc.Conn.ClientCert = &cert - } else if tokenProv == nil { - return nil, fmt.Errorf("must provide client cert or access token provider") } return pc, nil diff --git a/common/pkgs/rpc/client/server.go b/common/pkgs/rpc/client/server.go index 4c8e215..9bfaf64 100644 --- a/common/pkgs/rpc/client/server.go +++ b/common/pkgs/rpc/client/server.go @@ -5,6 +5,7 @@ import ( ) type ClientAPI interface { + ClusterService PubLockService }