Browse Source

解决系统事件不能同时被多个消费者消费的问题

gitlink
Sydonian 1 year ago
parent
commit
d59ba9fd56
3 changed files with 18 additions and 10 deletions
  1. +4
    -10
      common/pkgs/sysevent/publisher.go
  2. +1
    -0
      common/pkgs/sysevent/sysevent.go
  3. +13
    -0
      common/pkgs/sysevent/watcher.go

+ 4
- 10
common/pkgs/sysevent/publisher.go View File

@@ -48,16 +48,10 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) {
return nil, fmt.Errorf("openning channel on connection: %w", err)
}

_, err = channel.QueueDeclare(
SysEventQueueName,
false,
true,
false,
false,
nil,
)
err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil)
if err != nil {
return nil, fmt.Errorf("declare queue: %w", err)
connection.Close()
return nil, fmt.Errorf("declare exchange: %w", err)
}

pub := &Publisher{
@@ -94,7 +88,7 @@ func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] {
continue
}

err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{
err = p.channel.Publish(ExchangeName, "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: eventData,
Expiration: "60000", // 消息超时时间默认1分钟


+ 1
- 0
common/pkgs/sysevent/sysevent.go View File

@@ -6,6 +6,7 @@ import (

const (
SysEventQueueName = "SysEventQueue"
ExchangeName = "SysEventExchange"
)

type SysEvent = stgmod.SysEvent


+ 13
- 0
common/pkgs/sysevent/watcher.go View File

@@ -45,6 +45,12 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) {
return nil, fmt.Errorf("openning channel on connection: %w", err)
}

err = channel.ExchangeDeclare(ExchangeName, "fanout", false, true, false, false, nil)
if err != nil {
connection.Close()
return nil, fmt.Errorf("declare exchange: %w", err)
}

_, err = channel.QueueDeclare(
SysEventQueueName,
false,
@@ -59,6 +65,13 @@ func NewWatcherHost(cfg Config) (*WatcherHost, error) {
return nil, fmt.Errorf("declare queue: %w", err)
}

err = channel.QueueBind(SysEventQueueName, "", ExchangeName, false, nil)
if err != nil {
channel.Close()
connection.Close()
return nil, fmt.Errorf("bind queue: %w", err)
}

recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil)
if err != nil {
channel.Close()


Loading…
Cancel
Save