From 102b7da935711552c2de646851e67ddc4456c475 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 30 Aug 2023 16:48:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96serder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/services/scanner.go | 6 ++-- common/pkgs/mq/scanner/event.go | 38 +++++++----------------- common/pkgs/mq/scanner/event/event.go | 18 +++-------- coordinator/internal/services/package.go | 7 +++-- magefiles/main.go | 11 ++----- scanner/internal/event/event.go | 3 +- scanner/internal/services/event.go | 10 +------ 7 files changed, 29 insertions(+), 64 deletions(-) diff --git a/client/internal/services/scanner.go b/client/internal/services/scanner.go index 7acb249..2ad0869 100644 --- a/client/internal/services/scanner.go +++ b/client/internal/services/scanner.go @@ -4,6 +4,8 @@ import ( "fmt" "gitlink.org.cn/cloudream/storage/common/globals" + scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) type ScannerService struct { @@ -14,14 +16,14 @@ func (svc *Service) ScannerSvc() *ScannerService { return &ScannerService{Service: svc} } -func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error { +func (svc *ScannerService) PostEvent(event scevt.Event, isEmergency bool, dontMerge bool) error { scCli, err := globals.ScannerMQPool.Acquire() if err != nil { return fmt.Errorf("new scacnner client: %w", err) } defer scCli.Close() - err = scCli.PostEvent(event, isEmergency, dontMerge) + err = scCli.PostEvent(scmq.NewPostEvent(event, isEmergency, dontMerge)) if err != nil { return fmt.Errorf("request to scanner failed, err: %w", err) } diff --git a/common/pkgs/mq/scanner/event.go b/common/pkgs/mq/scanner/event.go index c94dd2c..6062d01 100644 --- a/common/pkgs/mq/scanner/event.go +++ b/common/pkgs/mq/scanner/event.go @@ -1,9 +1,6 @@ package scanner import ( - "fmt" - "time" - "gitlink.org.cn/cloudream/common/pkgs/mq" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -16,35 +13,22 @@ type EventService interface { var _ = RegisterNoReply(EventService.PostEvent) type PostEvent struct { - Event map[string]any `json:"event"` - IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 - DontMerge bool `json:"dontMerge"` // 不可合并此消息 + Event scevt.Event `json:"event"` + IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理 + DontMerge bool `json:"dontMerge"` // 不可合并此消息 } -func NewPostEvent(event any, isEmergency bool, dontMerge bool) (PostEvent, error) { - mp, err := scevt.MessageToMap(event) - if err != nil { - return PostEvent{}, fmt.Errorf("message to map failed, err: %w", err) - } - +func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) PostEvent { return PostEvent{ - Event: mp, + Event: event, IsEmergency: isEmergency, DontMerge: dontMerge, - }, nil -} -func (cli *Client) PostEvent(event any, isEmergency bool, dontMerge bool, opts ...mq.SendOption) error { - opt := mq.SendOption{ - Timeout: time.Second * 30, - } - if len(opts) > 0 { - opt = opts[0] - } - - body, err := NewPostEvent(event, isEmergency, dontMerge) - if err != nil { - return fmt.Errorf("new post event body failed, err: %w", err) } +} +func (client *Client) PostEvent(msg PostEvent) error { + return mq.Send[PostEvent](client.rabbitCli, msg) +} - return mq.Send(cli.rabbitCli, body, opt) +func init() { + mq.RegisterUnionType(scevt.EventTypeUnino) } diff --git a/common/pkgs/mq/scanner/event/event.go b/common/pkgs/mq/scanner/event/event.go index 9281ca2..d989d5f 100644 --- a/common/pkgs/mq/scanner/event/event.go +++ b/common/pkgs/mq/scanner/event/event.go @@ -5,21 +5,11 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" ) -var typeResolver = serder.NewTypeNameResolver(true) +type Event interface{} -var serderOption = serder.TypedSerderOption{ - TypeResolver: &typeResolver, - TypeFieldName: "@type", -} - -func MapToMessage(m map[string]any) (any, error) { - return serder.TypedMapToObject(m, serderOption) -} - -func MessageToMap(msg any) (map[string]any, error) { - return serder.ObjectToTypedMap(msg, serderOption) -} +var eventUnionEles = serder.NewTypeNameResolver(true) +var EventTypeUnino = serder.NewTypeUnion[Event]("@type", eventUnionEles) func Register[T any]() { - typeResolver.Register(myreflect.TypeOf[T]()) + eventUnionEles.Register(myreflect.TypeOf[T]()) } diff --git a/coordinator/internal/services/package.go b/coordinator/internal/services/package.go index 02adee3..4098dd5 100644 --- a/coordinator/internal/services/package.go +++ b/coordinator/internal/services/package.go @@ -11,6 +11,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) @@ -94,7 +95,7 @@ func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.Upda affectFileHashes = append(affectFileHashes, add.FileHash) } - err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true) + err = svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) } @@ -170,7 +171,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack // 不追求及时、准确 if len(stgs) == 0 { // 如果没有被引用,直接投递CheckPackage的任务 - err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false) + err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } @@ -179,7 +180,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack } else { // 有引用则让Agent去检查StoragePackage for _, stg := range stgs { - err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false) + err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)) if err != nil { logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) } diff --git a/magefiles/main.go b/magefiles/main.go index f59c3cd..bd70eb4 100644 --- a/magefiles/main.go +++ b/magefiles/main.go @@ -17,24 +17,19 @@ const ( BuildDir = "./build" ) -var Global = struct { - OS string - Arch string -}{} - // [配置项]设置编译平台为windows func Win() { - Global.OS = "win" + magefiles.Global.OS = "win" } // [配置项]设置编译平台为linux func Linux() { - Global.OS = "linux" + magefiles.Global.OS = "linux" } // [配置项]设置编译架构为amd64 func AMD64() { - Global.Arch = "amd64" + magefiles.Global.Arch = "amd64" } func All() error { diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index e117dc1..27a1aba 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -8,6 +8,7 @@ import ( event "gitlink.org.cn/cloudream/common/pkgs/event" "gitlink.org.cn/cloudream/common/pkgs/typedispatcher" mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) type ExecuteArgs struct { @@ -32,7 +33,7 @@ func NewExecutor(db *mydb.DB, distLock *distlocksvc.Service) Executor { var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() -func FromMessage(msg any) (Event, error) { +func FromMessage(msg scevt.Event) (Event, error) { event, ok := msgDispatcher.Dispatch(msg) if !ok { return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name()) diff --git a/scanner/internal/services/event.go b/scanner/internal/services/event.go index c7ceb2a..9333c13 100644 --- a/scanner/internal/services/event.go +++ b/scanner/internal/services/event.go @@ -3,19 +3,11 @@ package services import ( "gitlink.org.cn/cloudream/common/pkgs/logger" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" - scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" "gitlink.org.cn/cloudream/storage/scanner/internal/event" ) func (svc *Service) PostEvent(msg *scmq.PostEvent) { - - evtMsg, err := scevt.MapToMessage(msg.Event) - if err != nil { - logger.Warnf("convert map to event message failed, err: %s", err.Error()) - return - } - - evt, err := event.FromMessage(evtMsg) + evt, err := event.FromMessage(msg.Event) if err != nil { logger.Warnf("create event from event message failed, err: %s", err.Error()) return