|
|
|
@@ -80,7 +80,7 @@ type RabbitMQServer struct { |
|
|
|
connection *amqp.Connection |
|
|
|
channel *amqp.Channel |
|
|
|
closed chan any |
|
|
|
config RabbitMQParam |
|
|
|
config Config |
|
|
|
|
|
|
|
OnMessage MessageHandlerFn |
|
|
|
OnError func(err error) |
|
|
|
@@ -91,17 +91,17 @@ type RabbitMQParam struct { |
|
|
|
RetryInterval int `json:"retryInterval"` |
|
|
|
} |
|
|
|
|
|
|
|
func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn, cfg RabbitMQParam) (*RabbitMQServer, error) { |
|
|
|
func NewRabbitMQServer(cfg Config, queueName string, onMessage MessageHandlerFn) (*RabbitMQServer, error) { |
|
|
|
config := amqp.Config{ |
|
|
|
Vhost: cfg.VHost, |
|
|
|
Dial: func(network, addr string) (net.Conn, error) { |
|
|
|
return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒 |
|
|
|
}, |
|
|
|
} |
|
|
|
connection, err := amqp.DialConfig(url, config) |
|
|
|
connection, err := amqp.DialConfig(fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address), config) |
|
|
|
|
|
|
|
//connection, err := amqp.Dial(url) |
|
|
|
if err != nil { |
|
|
|
return nil, fmt.Errorf("connecting to %s: %w", url, err) |
|
|
|
return nil, fmt.Errorf("connecting to %s: %w", cfg.Address, err) |
|
|
|
} |
|
|
|
|
|
|
|
channel, err := connection.Channel() |
|
|
|
@@ -137,13 +137,13 @@ func (s *RabbitMQServer) Start() *sync2.UnboundChannel[RabbitMQServerEvent] { |
|
|
|
select { |
|
|
|
case rawReq, ok := <-channel: |
|
|
|
if !ok { |
|
|
|
if retryNum > s.config.RetryNum { |
|
|
|
if retryNum > s.config.Param.RetryNum { |
|
|
|
ch.Send(ServerExit{Error: fmt.Errorf("maximum number of retries exceeded")}) |
|
|
|
return ch |
|
|
|
} |
|
|
|
retryNum++ |
|
|
|
|
|
|
|
time.Sleep(time.Duration(s.config.RetryInterval) * time.Millisecond) |
|
|
|
time.Sleep(time.Duration(s.config.Param.RetryInterval) * time.Millisecond) |
|
|
|
channel = s.openChannel(ch) |
|
|
|
continue |
|
|
|
} |
|
|
|
|