Browse Source

在服务内部某个模块出问题时直接退出程序

gitlink
Sydonian 2 years ago
parent
commit
4d09604769
4 changed files with 33 additions and 29 deletions
  1. +13
    -14
      agent/main.go
  2. +3
    -0
      client/main.go
  3. +3
    -0
      coordinator/main.go
  4. +14
    -15
      scanner/main.go

+ 13
- 14
agent/main.go View File

@@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"net" "net"
"os" "os"
"sync"


log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
@@ -97,10 +96,6 @@ func main() {


dlder := downloader.NewDownloader(config.Cfg().Downloader) dlder := downloader.NewDownloader(config.Cfg().Downloader)


//处置协调端、客户端命令(可多建几个)
wg := sync.WaitGroup{}
wg.Add(4)

taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder) taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder)


// 启动命令服务器 // 启动命令服务器
@@ -112,8 +107,7 @@ func main() {
agtSvr.OnError(func(err error) { agtSvr.OnError(func(err error) {
log.Warnf("agent server err: %s", err.Error()) log.Warnf("agent server err: %s", err.Error())
}) })

go serveAgentServer(agtSvr, &wg)
go serveAgentServer(agtSvr)


//面向客户端收发数据 //面向客户端收发数据
listenAddr := config.Cfg().GRPC.MakeListenAddress() listenAddr := config.Cfg().GRPC.MakeListenAddress()
@@ -121,17 +115,17 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error())
} }

s := grpc.NewServer() s := grpc.NewServer()
agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw)) agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw))
go serveGRPC(s, lis, &wg)
go serveGRPC(s, lis)


go serveDistLock(distlock) go serveDistLock(distlock)


wg.Wait()
foever := make(chan struct{})
<-foever
} }


func serveAgentServer(server *agtmq.Server, wg *sync.WaitGroup) {
func serveAgentServer(server *agtmq.Server) {
log.Info("start serving command server") log.Info("start serving command server")


err := server.Serve() err := server.Serve()
@@ -142,10 +136,11 @@ func serveAgentServer(server *agtmq.Server, wg *sync.WaitGroup) {


log.Info("command server stopped") log.Info("command server stopped")


wg.Done()
// TODO 仅简单结束了程序
os.Exit(1)
} }


func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) {
func serveGRPC(s *grpc.Server, lis net.Listener) {
log.Info("start serving grpc") log.Info("start serving grpc")


err := s.Serve(lis) err := s.Serve(lis)
@@ -156,7 +151,8 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) {


log.Info("grpc stopped") log.Info("grpc stopped")


wg.Done()
// TODO 仅简单结束了程序
os.Exit(1)
} }


func serveDistLock(svc *distlock.Service) { func serveDistLock(svc *distlock.Service) {
@@ -169,4 +165,7 @@ func serveDistLock(svc *distlock.Service) {
} }


log.Info("distlock stopped") log.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
} }

+ 3
- 0
client/main.go View File

@@ -79,4 +79,7 @@ func serveDistLock(svc *distlock.Service) {
} }


logger.Info("distlock stopped") logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)
} }

+ 3
- 0
coordinator/main.go View File

@@ -54,4 +54,7 @@ func serveCoorServer(server *coormq.Server) {
} }


logger.Info("command server stopped") logger.Info("command server stopped")

// TODO 仅简单结束了程序
os.Exit(1)
} }

+ 14
- 15
scanner/main.go View File

@@ -3,7 +3,6 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"sync"


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgglb "gitlink.org.cn/cloudream/storage/common/globals"
@@ -36,18 +35,15 @@ func main() {


stgglb.InitMQPool(&config.Cfg().RabbitMQ) stgglb.InitMQPool(&config.Cfg().RabbitMQ)


wg := sync.WaitGroup{}
wg.Add(3)

distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil { if err != nil {
logger.Warnf("new distlock service failed, err: %s", err.Error()) logger.Warnf("new distlock service failed, err: %s", err.Error())
os.Exit(1) os.Exit(1)
} }
go serveDistLock(distlockSvc, &wg)
go serveDistLock(distlockSvc)


eventExecutor := event.NewExecutor(db, distlockSvc) eventExecutor := event.NewExecutor(db, distlockSvc)
go serveEventExecutor(&eventExecutor, &wg)
go serveEventExecutor(&eventExecutor)


agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ) agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
@@ -56,8 +52,7 @@ func main() {
agtSvr.OnError(func(err error) { agtSvr.OnError(func(err error) {
logger.Warnf("agent server err: %s", err.Error()) logger.Warnf("agent server err: %s", err.Error())
}) })

go serveScannerServer(agtSvr, &wg)
go serveScannerServer(agtSvr)


tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{ tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{
EventExecutor: &eventExecutor, EventExecutor: &eventExecutor,
@@ -65,10 +60,11 @@ func main() {
}) })
startTickEvent(&tickExecutor) startTickEvent(&tickExecutor)


wg.Wait()
forever := make(chan struct{})
<-forever
} }


func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) {
func serveEventExecutor(executor *event.Executor) {
logger.Info("start serving event executor") logger.Info("start serving event executor")


err := executor.Execute() err := executor.Execute()
@@ -79,10 +75,11 @@ func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) {


logger.Info("event executor stopped") logger.Info("event executor stopped")


wg.Done()
// TODO 仅简单结束了程序
os.Exit(1)
} }


func serveScannerServer(server *scmq.Server, wg *sync.WaitGroup) {
func serveScannerServer(server *scmq.Server) {
logger.Info("start serving scanner server") logger.Info("start serving scanner server")


err := server.Serve() err := server.Serve()
@@ -93,10 +90,11 @@ func serveScannerServer(server *scmq.Server, wg *sync.WaitGroup) {


logger.Info("scanner server stopped") logger.Info("scanner server stopped")


wg.Done()
// TODO 仅简单结束了程序
os.Exit(1)
} }


func serveDistLock(svc *distlock.Service, wg *sync.WaitGroup) {
func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock") logger.Info("start serving distlock")


err := svc.Serve() err := svc.Serve()
@@ -107,7 +105,8 @@ func serveDistLock(svc *distlock.Service, wg *sync.WaitGroup) {


logger.Info("distlock stopped") logger.Info("distlock stopped")


wg.Done()
// TODO 仅简单结束了程序
os.Exit(1)
} }


func startTickEvent(tickExecutor *tickevent.Executor) { func startTickEvent(tickExecutor *tickevent.Executor) {


Loading…
Cancel
Save