From d59ba9fd56435f1f4b8ea265b2a7e473332200af Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 5 Mar 2025 10:06:56 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E7=B3=BB=E7=BB=9F=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E4=B8=8D=E8=83=BD=E5=90=8C=E6=97=B6=E8=A2=AB=E5=A4=9A?= =?UTF-8?q?=E4=B8=AA=E6=B6=88=E8=B4=B9=E8=80=85=E6=B6=88=E8=B4=B9=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/pkgs/sysevent/publisher.go | 14 ++++---------- common/pkgs/sysevent/sysevent.go | 1 + common/pkgs/sysevent/watcher.go | 13 +++++++++++++ 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go index dfb482b..bb95065 100644 --- a/common/pkgs/sysevent/publisher.go +++ b/common/pkgs/sysevent/publisher.go @@ -48,16 +48,10 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) { return nil, fmt.Errorf("openning channel on connection: %w", err) } - _, err = channel.QueueDeclare( - SysEventQueueName, - false, - true, - false, - false, - nil, - ) + err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil) if err != nil { - return nil, fmt.Errorf("declare queue: %w", err) + connection.Close() + return nil, fmt.Errorf("declare exchange: %w", err) } pub := &Publisher{ @@ -94,7 +88,7 @@ func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] { continue } - err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{ + err = p.channel.Publish(ExchangeName, "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: eventData, Expiration: "60000", // 消息超时时间默认1分钟 diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go index 93c2a62..00feb1f 100644 --- a/common/pkgs/sysevent/sysevent.go +++ b/common/pkgs/sysevent/sysevent.go @@ -6,6 +6,7 @@ import ( const ( SysEventQueueName = "SysEventQueue" + ExchangeName = "SysEventExchange" ) type SysEvent = stgmod.SysEvent diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go index c93cca5..98e0b6d 100644 --- a/common/pkgs/sysevent/watcher.go +++ b/common/pkgs/sysevent/watcher.go @@ -45,6 +45,12 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) { return nil, fmt.Errorf("openning channel on connection: %w", err) } + err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil) + if err != nil { + connection.Close() + return nil, fmt.Errorf("declare exchange: %w", err) + } + _, err = channel.QueueDeclare( SysEventQueueName, false, @@ -59,6 +65,13 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) { return nil, fmt.Errorf("declare queue: %w", err) } + err = channel.QueueBind(SysEventQueueName, "", ExchangeName, false, nil) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("bind queue: %w", err) + } + recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil) if err != nil { channel.Close()