diff --git a/pkgs/mq/server.go b/pkgs/mq/server.go index 7c65e6b..a9f581a 100644 --- a/pkgs/mq/server.go +++ b/pkgs/mq/server.go @@ -80,7 +80,7 @@ type RabbitMQServer struct { connection *amqp.Connection channel *amqp.Channel closed chan any - config RabbitMQParam + config Config OnMessage MessageHandlerFn OnError func(err error) @@ -91,17 +91,17 @@ type RabbitMQParam struct { RetryInterval int `json:"retryInterval"` } -func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn, cfg RabbitMQParam) (*RabbitMQServer, error) { +func NewRabbitMQServer(cfg Config, queueName string, onMessage MessageHandlerFn) (*RabbitMQServer, error) { config := amqp.Config{ + Vhost: cfg.VHost, Dial: func(network, addr string) (net.Conn, error) { return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒 }, } - connection, err := amqp.DialConfig(url, config) + connection, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address), config) - //connection, err := amqp.Dial(url) if err != nil { - return nil, fmt.Errorf("connecting to %s: %w", url, err) + return nil, fmt.Errorf("connecting to %s: %w", cfg.Address, err) } channel, err := connection.Channel() @@ -137,13 +137,13 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQServerEvent] { select { case rawReq, ok := <-channel: if !ok { - if retryNum > s.config.RetryNum { + if retryNum > s.config.Param.RetryNum { ch.Send(ServerExit{Error: fmt.Errorf("maximum number of retries exceeded")}) return ch } retryNum++ - time.Sleep(time.Duration(s.config.RetryInterval) * time.Millisecond) + time.Sleep(time.Duration(s.config.Param.RetryInterval) * time.Millisecond) channel = s.openChannel(ch) continue }