From 0e15696a2afc8d1c370669ac0486ddd574c577d7 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 19 Dec 2024 09:24:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mq=E9=85=8D=E7=BD=AE=E6=96=B9?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/mq/server.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkgs/mq/server.go b/pkgs/mq/server.go index 7c65e6b..a9f581a 100644 --- a/pkgs/mq/server.go +++ b/pkgs/mq/server.go @@ -80,7 +80,7 @@ type RabbitMQServer struct { connection *amqp.Connection channel *amqp.Channel closed chan any - config RabbitMQParam + config Config OnMessage MessageHandlerFn OnError func(err error) @@ -91,17 +91,17 @@ type RabbitMQParam struct { RetryInterval int `json:"retryInterval"` } -func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn, cfg RabbitMQParam) (*RabbitMQServer, error) { +func NewRabbitMQServer(cfg Config, queueName string, onMessage MessageHandlerFn) (*RabbitMQServer, error) { config := amqp.Config{ + Vhost: cfg.VHost, Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒 }, } - connection, err := amqp.DialConfig(url, config) + connection, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address), config) - //connection, err := amqp.Dial(url) if err != nil { - return nil, fmt.Errorf("connecting to %s: %w", url, err) + return nil, fmt.Errorf("connecting to %s: %w", cfg.Address, err) } channel, err := connection.Channel() @@ -137,13 +137,13 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQServerEvent] { select { case rawReq, ok := <-channel: if !ok { - if retryNum > s.config.RetryNum { + if retryNum > s.config.Param.RetryNum { ch.Send(ServerExit{Error: fmt.Errorf("maximum number of retries exceeded")}) return ch } retryNum++ - time.Sleep(time.Duration(s.config.RetryInterval) * time.Millisecond) + time.Sleep(time.Duration(s.config.Param.RetryInterval) * time.Millisecond) channel = s.openChannel(ch) continue }