diff --git a/agent/main.go b/agent/main.go index 1a449ee..25c8eee 100644 --- a/agent/main.go +++ b/agent/main.go @@ -4,7 +4,6 @@ import ( "fmt" "net" "os" - "sync" log "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" @@ -97,10 +96,6 @@ func main() { dlder := downloader.NewDownloader(config.Cfg().Downloader) - //处置协调端、客户端命令(可多建几个) - wg := sync.WaitGroup{} - wg.Add(4) - taskMgr := task.NewManager(distlock, &sw, &conCol, &dlder) // 启动命令服务器 @@ -112,8 +107,7 @@ func main() { agtSvr.OnError(func(err error) { log.Warnf("agent server err: %s", err.Error()) }) - - go serveAgentServer(agtSvr, &wg) + go serveAgentServer(agtSvr) //面向客户端收发数据 listenAddr := config.Cfg().GRPC.MakeListenAddress() @@ -121,17 +115,17 @@ func main() { if err != nil { log.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) } - s := grpc.NewServer() agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&sw)) - go serveGRPC(s, lis, &wg) + go serveGRPC(s, lis) go serveDistLock(distlock) - wg.Wait() + foever := make(chan struct{}) + <-foever } -func serveAgentServer(server *agtmq.Server, wg *sync.WaitGroup) { +func serveAgentServer(server *agtmq.Server) { log.Info("start serving command server") err := server.Serve() @@ -142,10 +136,11 @@ func serveAgentServer(server *agtmq.Server, wg *sync.WaitGroup) { log.Info("command server stopped") - wg.Done() + // TODO 仅简单结束了程序 + os.Exit(1) } -func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) { +func serveGRPC(s *grpc.Server, lis net.Listener) { log.Info("start serving grpc") err := s.Serve(lis) @@ -156,7 +151,8 @@ func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) { log.Info("grpc stopped") - wg.Done() + // TODO 仅简单结束了程序 + os.Exit(1) } func serveDistLock(svc *distlock.Service) { @@ -169,4 +165,7 @@ func serveDistLock(svc *distlock.Service) { } log.Info("distlock stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) } diff --git a/client/main.go b/client/main.go index 0b6b3cc..7020ec8 100644 --- a/client/main.go +++ b/client/main.go @@ -79,4 +79,7 @@ func serveDistLock(svc *distlock.Service) { } logger.Info("distlock stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) } diff --git a/coordinator/main.go b/coordinator/main.go index ccf65be..90b2d1b 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -54,4 +54,7 @@ func serveCoorServer(server *coormq.Server) { } logger.Info("command server stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) } diff --git a/scanner/main.go b/scanner/main.go index 88d6a5a..c89b846 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -3,7 +3,6 @@ package main import ( "fmt" "os" - "sync" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/storage/common/globals" @@ -36,18 +35,15 @@ func main() { stgglb.InitMQPool(&config.Cfg().RabbitMQ) - wg := sync.WaitGroup{} - wg.Add(3) - distlockSvc, err := distlock.NewService(&config.Cfg().DistLock) if err != nil { logger.Warnf("new distlock service failed, err: %s", err.Error()) os.Exit(1) } - go serveDistLock(distlockSvc, &wg) + go serveDistLock(distlockSvc) eventExecutor := event.NewExecutor(db, distlockSvc) - go serveEventExecutor(&eventExecutor, &wg) + go serveEventExecutor(&eventExecutor) agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), &config.Cfg().RabbitMQ) if err != nil { @@ -56,8 +52,7 @@ func main() { agtSvr.OnError(func(err error) { logger.Warnf("agent server err: %s", err.Error()) }) - - go serveScannerServer(agtSvr, &wg) + go serveScannerServer(agtSvr) tickExecutor := tickevent.NewExecutor(tickevent.ExecuteArgs{ EventExecutor: &eventExecutor, @@ -65,10 +60,11 @@ func main() { }) startTickEvent(&tickExecutor) - wg.Wait() + forever := make(chan struct{}) + <-forever } -func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) { +func serveEventExecutor(executor *event.Executor) { logger.Info("start serving event executor") err := executor.Execute() @@ -79,10 +75,11 @@ func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) { logger.Info("event executor stopped") - wg.Done() + // TODO 仅简单结束了程序 + os.Exit(1) } -func serveScannerServer(server *scmq.Server, wg *sync.WaitGroup) { +func serveScannerServer(server *scmq.Server) { logger.Info("start serving scanner server") err := server.Serve() @@ -93,10 +90,11 @@ func serveScannerServer(server *scmq.Server, wg *sync.WaitGroup) { logger.Info("scanner server stopped") - wg.Done() + // TODO 仅简单结束了程序 + os.Exit(1) } -func serveDistLock(svc *distlock.Service, wg *sync.WaitGroup) { +func serveDistLock(svc *distlock.Service) { logger.Info("start serving distlock") err := svc.Serve() @@ -107,7 +105,8 @@ func serveDistLock(svc *distlock.Service, wg *sync.WaitGroup) { logger.Info("distlock stopped") - wg.Done() + // TODO 仅简单结束了程序 + os.Exit(1) } func startTickEvent(tickExecutor *tickevent.Executor) {