diff --git a/scanner/README.md b/scanner/README.md deleted file mode 100644 index b0fda77..0000000 --- a/scanner/README.md +++ /dev/null @@ -1,8 +0,0 @@ -# Scanner服务 - -## 目录结构 -- `internal`:服务源码。 - - `config`:服务使用的配置文件结构定义。 - - `event`:被投递到队列顺序执行的事件。 - - `mq`:通过rabbitmq对外提供的接口。实现了`common\pkgs\mq\scanner`目录里文件定义的接口。 - - `tickevent`:定时执行的事件。 \ No newline at end of file diff --git a/scanner/internal/config/config.go b/scanner/internal/config/config.go deleted file mode 100644 index 1e4dd33..0000000 --- a/scanner/internal/config/config.go +++ /dev/null @@ -1,29 +0,0 @@ -package config - -import ( - "gitlink.org.cn/cloudream/common/pkgs/distlock" - log "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - c "gitlink.org.cn/cloudream/common/utils/config" - db "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2/config" -) - -type Config struct { - AccessStatHistoryAmount float64 `json:"accessStatHistoryAmount"` - ECFileSizeThreshold int64 `json:"ecFileSizeThreshold"` - HubUnavailableSeconds int `json:"hubUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 - Logger log.Config `json:"logger"` - DB db.Config `json:"db"` - RabbitMQ mq.Config `json:"rabbitMQ"` - DistLock distlock.Config `json:"distlock"` -} - -var cfg Config - -func Init() error { - return c.DefaultLoad("scanner", &cfg) -} - -func Cfg() *Config { - return &cfg -} diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go deleted file mode 100644 index 727c862..0000000 --- a/scanner/internal/event/agent_check_state.go +++ /dev/null @@ -1,81 +0,0 @@ -package event - -import ( - "database/sql" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/jcs-pub/common/consts" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - agtmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config" -) - -type HubCheckState struct { - *scevt.HubCheckState -} - -func NewHubCheckState(evt *scevt.HubCheckState) *HubCheckState { - return &HubCheckState{ - HubCheckState: evt, - } -} - -func (t *HubCheckState) TryMerge(other Event) bool { - event, ok := other.(*HubCheckState) - if !ok { - return false - } - - return t.HubID == event.HubID -} - -func (t *HubCheckState) Execute(execCtx ExecuteContext) { - log := logger.WithType[HubCheckState]("Event") - log.Debugf("begin with %v", logger.FormatStruct(t.HubCheckState)) - defer log.Debugf("end") - - hub, err := execCtx.Args.DB.Hub().GetByID(execCtx.Args.DB.DefCtx(), t.HubID) - if err == sql.ErrNoRows { - return - } - - if err != nil { - log.WithField("HubID", t.HubID).Warnf("get hub by id failed, err: %s", err.Error()) - return - } - - agtCli, err := stgglb.HubMQPool.Acquire(t.HubID) - if err != nil { - log.WithField("HubID", t.HubID).Warnf("create hub client failed, err: %s", err.Error()) - return - } - defer stgglb.HubMQPool.Release(agtCli) - - _, err = agtCli.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) - if err != nil { - log.WithField("HubID", t.HubID).Warnf("getting state: %s", err.Error()) - - // 检查上次上报时间,超时的设置为不可用 - // TODO 没有上报过是否要特殊处理? - if hub.LastReportTime != nil && time.Since(*hub.LastReportTime) > time.Duration(config.Cfg().HubUnavailableSeconds)*time.Second { - err := execCtx.Args.DB.Hub().UpdateState(execCtx.Args.DB.DefCtx(), t.HubID, consts.HubStateUnavailable) - if err != nil { - log.WithField("HubID", t.HubID).Warnf("set hub state failed, err: %s", err.Error()) - } - } - return - } - - // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal - err = execCtx.Args.DB.Hub().UpdateState(execCtx.Args.DB.DefCtx(), t.HubID, consts.HubStateNormal) - if err != nil { - log.WithField("HubID", t.HubID).Warnf("change hub state failed, err: %s", err.Error()) - } -} - -func init() { - RegisterMessageConvertor(NewHubCheckState) -} diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go deleted file mode 100644 index a548772..0000000 --- a/scanner/internal/event/event.go +++ /dev/null @@ -1,55 +0,0 @@ -package event - -import ( - "fmt" - "reflect" - - "gitlink.org.cn/cloudream/common/pkgs/distlock" - event "gitlink.org.cn/cloudream/common/pkgs/event" - "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" -) - -type ExecuteArgs struct { - DB *db2.DB - DistLock *distlock.Service - StgMgr *agtpool.HubPool - EvtPub *sysevent.Publisher -} - -type Executor = event.Executor[ExecuteArgs] - -type ExecuteContext = event.ExecuteContext[ExecuteArgs] - -type Event = event.Event[ExecuteArgs] - -type ExecuteOption = event.ExecuteOption - -func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.HubPool, evtPub *sysevent.Publisher) Executor { - return event.NewExecutor(ExecuteArgs{ - DB: db, - DistLock: distLock, - StgMgr: stgAgts, - EvtPub: evtPub, - }) -} - -var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() - -func FromMessage(msg scevt.Event) (Event, error) { - event, ok := msgDispatcher.Dispatch(msg) - if !ok { - return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).String()) - } - - return event, nil -} - -func RegisterMessageConvertor[T any, TEvt Event](converter func(msg T) TEvt) { - typedispatcher.Add(msgDispatcher, func(msg T) Event { - return converter(msg) - }) -} diff --git a/scanner/internal/event/event_test.go b/scanner/internal/event/event_test.go deleted file mode 100644 index e71cd26..0000000 --- a/scanner/internal/event/event_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package event - -/* -import ( - "testing" - - "github.com/samber/lo" - . "github.com/smartystreets/goconvey/convey" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -) - -func Test_chooseSoManyNodes(t *testing.T) { - testcases := []struct { - title string - allNodes []*StorageLoadInfo - count int - expectedHubIDs []cdssdk.HubID - }{ - { - title: "节点数量充足", - allNodes: []*StorageLoadInfo{ - {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, - {Storage: cdssdk.Node{HubID: cdssdk.HubID(2)}}, - }, - count: 2, - expectedHubIDs: []cdssdk.HubID{1, 2}, - }, - { - title: "节点数量超过", - allNodes: []*StorageLoadInfo{ - {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, - {Storage: cdssdk.Node{HubID: cdssdk.HubID(2)}}, - {Storage: cdssdk.Node{HubID: cdssdk.HubID(3)}}, - }, - count: 2, - expectedHubIDs: []cdssdk.HubID{1, 2}, - }, - { - title: "只有一个节点,节点数量不够", - allNodes: []*StorageLoadInfo{ - {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, - }, - count: 3, - expectedHubIDs: []cdssdk.HubID{1, 1, 1}, - }, - { - title: "多个同地区节点,节点数量不够", - allNodes: []*StorageLoadInfo{ - {Storage: cdssdk.Node{HubID: cdssdk.HubID(1)}}, - {Storage: cdssdk.Node{HubID: cdssdk.HubID(2)}}, - }, - count: 5, - expectedHubIDs: []cdssdk.HubID{1, 1, 1, 2, 2}, - }, - { - title: "节点数量不够,且在不同地区", - allNodes: []*StorageLoadInfo{ - {Storage: cdssdk.Node{HubID: cdssdk.HubID(1), LocationID: cdssdk.LocationID(1)}}, - {Storage: cdssdk.Node{HubID: cdssdk.HubID(2), LocationID: cdssdk.LocationID(2)}}, - }, - count: 5, - expectedHubIDs: []cdssdk.HubID{1, 2, 1, 2, 1}, - }, - } - - for _, test := range testcases { - Convey(test.title, t, func() { - var t CheckPackageRedundancy - chosenNodes := t.chooseSoManyNodes(test.count, test.allNodes) - - chosenHubIDs := lo.Map(chosenNodes, func(item *StorageLoadInfo, idx int) cdssdk.HubID { return item.Storage.HubID }) - - So(chosenHubIDs, ShouldResemble, test.expectedHubIDs) - }) - } -} -*/ diff --git a/scanner/internal/mq/event.go b/scanner/internal/mq/event.go deleted file mode 100644 index 7717e5c..0000000 --- a/scanner/internal/mq/event.go +++ /dev/null @@ -1,20 +0,0 @@ -package mq - -import ( - "gitlink.org.cn/cloudream/common/pkgs/logger" - scmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -func (svc *Service) PostEvent(msg *scmq.PostEvent) { - evt, err := event.FromMessage(msg.Event) - if err != nil { - logger.Warnf("create event from event message failed, err: %s", err.Error()) - return - } - - svc.eventExecutor.Post(evt, event.ExecuteOption{ - IsEmergency: msg.IsEmergency, - DontMerge: msg.DontMerge, - }) -} diff --git a/scanner/internal/mq/service.go b/scanner/internal/mq/service.go deleted file mode 100644 index 37b12c3..0000000 --- a/scanner/internal/mq/service.go +++ /dev/null @@ -1,15 +0,0 @@ -package mq - -import ( - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -type Service struct { - eventExecutor *event.Executor -} - -func NewService(eventExecutor *event.Executor) *Service { - return &Service{ - eventExecutor: eventExecutor, - } -} diff --git a/scanner/internal/tickevent/check_agent_state.go b/scanner/internal/tickevent/check_agent_state.go deleted file mode 100644 index 92cb73e..0000000 --- a/scanner/internal/tickevent/check_agent_state.go +++ /dev/null @@ -1,33 +0,0 @@ -package tickevent - -import ( - "gitlink.org.cn/cloudream/common/pkgs/logger" - scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -type CheckHubState struct { -} - -func NewCheckHubState() *CheckHubState { - return &CheckHubState{} -} - -func (e *CheckHubState) Execute(ctx ExecuteContext) { - log := logger.WithType[CheckHubState]("TickEvent") - log.Debugf("begin") - defer log.Debugf("end") - - hubs, err := ctx.Args.DB.Hub().GetAllHubs(ctx.Args.DB.DefCtx()) - if err != nil { - log.Warnf("get all hubs failed, err: %s", err.Error()) - return - } - - for _, hub := range hubs { - ctx.Args.EventExecutor.Post(event.NewHubCheckState(scevt.NewHubCheckState(hub.HubID)), event.ExecuteOption{ - IsEmergency: true, - DontMerge: true, - }) - } -} diff --git a/scanner/internal/tickevent/tick_event.go b/scanner/internal/tickevent/tick_event.go deleted file mode 100644 index 734233b..0000000 --- a/scanner/internal/tickevent/tick_event.go +++ /dev/null @@ -1,24 +0,0 @@ -package tickevent - -import ( - tickevent "gitlink.org.cn/cloudream/common/pkgs/tickevent" - mydb "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" -) - -type ExecuteArgs struct { - EventExecutor *event.Executor - DB *mydb.DB -} - -type StartOption = tickevent.StartOption - -type Executor = tickevent.Executor[ExecuteArgs] - -type ExecuteContext = tickevent.ExecuteContext[ExecuteArgs] - -type Event = tickevent.TickEvent[ExecuteArgs] - -func NewExecutor(args ExecuteArgs) Executor { - return tickevent.NewExecutor(args) -} diff --git a/scanner/main.go b/scanner/main.go deleted file mode 100644 index 0b3391d..0000000 --- a/scanner/main.go +++ /dev/null @@ -1,199 +0,0 @@ -package main - -import ( - "context" - "fmt" - "os" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" - stgmod "gitlink.org.cn/cloudream/jcs-pub/common/models" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock" - agtrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/grpc/hub" - scmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/agtpool" - "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/config" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/event" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/mq" - "gitlink.org.cn/cloudream/jcs-pub/scanner/internal/tickevent" -) - -func main() { - err := config.Init() - if err != nil { - fmt.Printf("init config failed, err: %s", err.Error()) - os.Exit(1) - } - - err = logger.Init(&config.Cfg().Logger) - if err != nil { - fmt.Printf("init logger failed, err: %s", err.Error()) - os.Exit(1) - } - - db, err := db2.NewDB(&config.Cfg().DB) - if err != nil { - logger.Fatalf("new db failed, err: %s", err.Error()) - } - - stgglb.InitMQPool(config.Cfg().RabbitMQ) - - stgglb.InitHubRPCPool(&agtrpc.PoolConfig{}) - - // 启动分布式锁服务 - distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) - if err != nil { - logger.Warnf("new distlock service failed, err: %s", err.Error()) - os.Exit(1) - } - go serveDistLock(distlockSvc) - - // 启动存储服务管理器 - stgAgts := agtpool.NewPool() - - // 初始化系统事件发布器 - evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceScanner{}) - if err != nil { - logger.Errorf("new sysevent publisher: %v", err) - os.Exit(1) - } - go servePublisher(evtPub) - - // 启动事件执行器 - eventExecutor := event.NewExecutor(db, distlockSvc, stgAgts, evtPub) - go serveEventExecutor(&eventExecutor) - - agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ) - if err != nil { - logger.Fatalf("new hub server failed, err: %s", err.Error()) - } - agtSvr.OnError(func(err error) { - logger.Warnf("hub server err: %s", err.Error()) - }) - go serveScannerServer(agtSvr) - - tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{ - EventExecutor: &eventExecutor, - DB: db, - }) - startTickEvent(&tickExecutor) - - forever := make(chan struct{}) - <-forever -} - -func serveEventExecutor(executor *event.Executor) { - logger.Info("start serving event executor") - - err := executor.Execute() - - if err != nil { - logger.Errorf("event executor stopped with error: %s", err.Error()) - } - - logger.Info("event executor stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -func servePublisher(evtPub *sysevent.Publisher) { - logger.Info("start serving sysevent publisher") - - ch := evtPub.Start() - -loop: - for { - val, err := ch.Receive().Wait(context.Background()) - if err != nil { - logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) - break - } - - switch val := val.(type) { - case sysevent.PublishError: - logger.Errorf("publishing event: %v", val) - - case sysevent.PublisherExited: - if val.Err != nil { - logger.Errorf("publisher exited with error: %v", val.Err) - } else { - logger.Info("publisher exited") - } - break loop - - case sysevent.OtherError: - logger.Errorf("sysevent: %v", val) - } - } - logger.Info("sysevent publisher stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -func serveScannerServer(server *scmq.Server) { - logger.Info("start serving scanner server") - - ch := server.Start() - if ch == nil { - logger.Errorf("RabbitMQ logEvent is nil") - os.Exit(1) - } - - for { - val, err := ch.Receive() - if err != nil { - logger.Errorf("command server stopped with error: %s", err.Error()) - break - } - - switch val := val.(type) { - case error: - logger.Errorf("rabbitmq connect with error: %v", val) - case int: - if val == 1 { - break - } - } - } - logger.Info("command server stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -func serveDistLock(svc *distlock.Service) { - logger.Info("start serving distlock") - - err := svc.Serve() - - if err != nil { - logger.Errorf("distlock stopped with error: %s", err.Error()) - } - - logger.Info("distlock stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) -} - -func startTickEvent(tickExecutor *tickevent.Executor) { - // TODO 可以考虑增加配置文件,配置这些任务间隔时间 - - interval := 5 * 60 * 1000 - - tickExecutor.Start(tickevent.NewBatchAllHubCheckShardStore(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - - tickExecutor.Start(tickevent.NewStorageGC(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - - tickExecutor.Start(tickevent.NewCheckHubState(), 5*60*1000, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) - - tickExecutor.Start(tickevent.NewBatchCheckPackageRedundancy(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) - - tickExecutor.Start(tickevent.NewBatchCleanPinned(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) - - tickExecutor.Start(tickevent.NewUpdateAllPackageAccessStatAmount(), interval, tickevent.StartOption{RandomStartDelayMs: 20 * 60 * 1000}) -}