diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go index dfb482b..bb95065 100644 --- a/common/pkgs/sysevent/publisher.go +++ b/common/pkgs/sysevent/publisher.go @@ -48,16 +48,10 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) { return nil, fmt.Errorf("openning channel on connection: %w", err) } - _, err = channel.QueueDeclare( - SysEventQueueName, - false, - true, - false, - false, - nil, - ) + err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil) if err != nil { - return nil, fmt.Errorf("declare queue: %w", err) + connection.Close() + return nil, fmt.Errorf("declare exchange: %w", err) } pub := &Publisher{ @@ -94,7 +88,7 @@ func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] { continue } - err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{ + err = p.channel.Publish(ExchangeName, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: eventData, Expiration: "60000", // 消息超时时间默认1分钟 diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go index 93c2a62..00feb1f 100644 --- a/common/pkgs/sysevent/sysevent.go +++ b/common/pkgs/sysevent/sysevent.go @@ -6,6 +6,7 @@ import ( const ( SysEventQueueName = "SysEventQueue" + ExchangeName = "SysEventExchange" ) type SysEvent = stgmod.SysEvent diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go index c93cca5..98e0b6d 100644 --- a/common/pkgs/sysevent/watcher.go +++ b/common/pkgs/sysevent/watcher.go @@ -45,6 +45,12 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) { return nil, fmt.Errorf("openning channel on connection: %w", err) } + err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil) + if err != nil { + connection.Close() + return nil, fmt.Errorf("declare exchange: %w", err) + } + _, err = channel.QueueDeclare( SysEventQueueName, false, @@ -59,6 +65,13 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) { return nil, fmt.Errorf("declare queue: %w", err) } + err = channel.QueueBind(SysEventQueueName, "", ExchangeName, false, nil) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("bind queue: %w", err) + } + recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil) if err != nil { channel.Close()