Browse Source

优化serder

gitlink
Sydonian 2 years ago
parent
commit
102b7da935
7 changed files with 29 additions and 64 deletions
  1. +4
    -2
      client/internal/services/scanner.go
  2. +11
    -27
      common/pkgs/mq/scanner/event.go
  3. +4
    -14
      common/pkgs/mq/scanner/event/event.go
  4. +4
    -3
      coordinator/internal/services/package.go
  5. +3
    -8
      magefiles/main.go
  6. +2
    -1
      scanner/internal/event/event.go
  7. +1
    -9
      scanner/internal/services/event.go

+ 4
- 2
client/internal/services/scanner.go View File

@@ -4,6 +4,8 @@ import (
"fmt" "fmt"


"gitlink.org.cn/cloudream/storage/common/globals" "gitlink.org.cn/cloudream/storage/common/globals"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
) )


type ScannerService struct { type ScannerService struct {
@@ -14,14 +16,14 @@ func (svc *Service) ScannerSvc() *ScannerService {
return &ScannerService{Service: svc} return &ScannerService{Service: svc}
} }


func (svc *ScannerService) PostEvent(event any, isEmergency bool, dontMerge bool) error {
func (svc *ScannerService) PostEvent(event scevt.Event, isEmergency bool, dontMerge bool) error {
scCli, err := globals.ScannerMQPool.Acquire() scCli, err := globals.ScannerMQPool.Acquire()
if err != nil { if err != nil {
return fmt.Errorf("new scacnner client: %w", err) return fmt.Errorf("new scacnner client: %w", err)
} }
defer scCli.Close() defer scCli.Close()


err = scCli.PostEvent(event, isEmergency, dontMerge)
err = scCli.PostEvent(scmq.NewPostEvent(event, isEmergency, dontMerge))
if err != nil { if err != nil {
return fmt.Errorf("request to scanner failed, err: %w", err) return fmt.Errorf("request to scanner failed, err: %w", err)
} }


+ 11
- 27
common/pkgs/mq/scanner/event.go View File

@@ -1,9 +1,6 @@
package scanner package scanner


import ( import (
"fmt"
"time"

"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
) )
@@ -16,35 +13,22 @@ type EventService interface {
var _ = RegisterNoReply(EventService.PostEvent) var _ = RegisterNoReply(EventService.PostEvent)


type PostEvent struct { type PostEvent struct {
Event map[string]any `json:"event"`
IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理
DontMerge bool `json:"dontMerge"` // 不可合并此消息
Event scevt.Event `json:"event"`
IsEmergency bool `json:"isEmergency"` // 重要消息,优先处理
DontMerge bool `json:"dontMerge"` // 不可合并此消息
} }


func NewPostEvent(event any, isEmergency bool, dontMerge bool) (PostEvent, error) {
mp, err := scevt.MessageToMap(event)
if err != nil {
return PostEvent{}, fmt.Errorf("message to map failed, err: %w", err)
}

func NewPostEvent(event scevt.Event, isEmergency bool, dontMerge bool) PostEvent {
return PostEvent{ return PostEvent{
Event: mp,
Event: event,
IsEmergency: isEmergency, IsEmergency: isEmergency,
DontMerge: dontMerge, DontMerge: dontMerge,
}, nil
}
func (cli *Client) PostEvent(event any, isEmergency bool, dontMerge bool, opts ...mq.SendOption) error {
opt := mq.SendOption{
Timeout: time.Second * 30,
}
if len(opts) > 0 {
opt = opts[0]
}

body, err := NewPostEvent(event, isEmergency, dontMerge)
if err != nil {
return fmt.Errorf("new post event body failed, err: %w", err)
} }
}
func (client *Client) PostEvent(msg PostEvent) error {
return mq.Send[PostEvent](client.rabbitCli, msg)
}


return mq.Send(cli.rabbitCli, body, opt)
func init() {
mq.RegisterUnionType(scevt.EventTypeUnino)
} }

+ 4
- 14
common/pkgs/mq/scanner/event/event.go View File

@@ -5,21 +5,11 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


var typeResolver = serder.NewTypeNameResolver(true)
type Event interface{}


var serderOption = serder.TypedSerderOption{
TypeResolver: &typeResolver,
TypeFieldName: "@type",
}

func MapToMessage(m map[string]any) (any, error) {
return serder.TypedMapToObject(m, serderOption)
}

func MessageToMap(msg any) (map[string]any, error) {
return serder.ObjectToTypedMap(msg, serderOption)
}
var eventUnionEles = serder.NewTypeNameResolver(true)
var EventTypeUnino = serder.NewTypeUnion[Event]("@type", eventUnionEles)


func Register[T any]() { func Register[T any]() {
typeResolver.Register(myreflect.TypeOf[T]())
eventUnionEles.Register(myreflect.TypeOf[T]())
} }

+ 4
- 3
coordinator/internal/services/package.go View File

@@ -11,6 +11,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
) )


@@ -94,7 +95,7 @@ func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.Upda
affectFileHashes = append(affectFileHashes, add.FileHash) affectFileHashes = append(affectFileHashes, add.FileHash)
} }


err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true)
err = svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true))
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error())
} }
@@ -170,7 +171,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack
// 不追求及时、准确 // 不追求及时、准确
if len(stgs) == 0 { if len(stgs) == 0 {
// 如果没有被引用,直接投递CheckPackage的任务 // 如果没有被引用,直接投递CheckPackage的任务
err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false)
err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false))
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
} }
@@ -179,7 +180,7 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack
} else { } else {
// 有引用则让Agent去检查StoragePackage // 有引用则让Agent去检查StoragePackage
for _, stg := range stgs { for _, stg := range stgs {
err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false)
err := svc.scanner.PostEvent(scmq.NewPostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false))
if err != nil { if err != nil {
logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error())
} }


+ 3
- 8
magefiles/main.go View File

@@ -17,24 +17,19 @@ const (
BuildDir = "./build" BuildDir = "./build"
) )


var Global = struct {
OS string
Arch string
}{}

// [配置项]设置编译平台为windows // [配置项]设置编译平台为windows
func Win() { func Win() {
Global.OS = "win"
magefiles.Global.OS = "win"
} }


// [配置项]设置编译平台为linux // [配置项]设置编译平台为linux
func Linux() { func Linux() {
Global.OS = "linux"
magefiles.Global.OS = "linux"
} }


// [配置项]设置编译架构为amd64 // [配置项]设置编译架构为amd64
func AMD64() { func AMD64() {
Global.Arch = "amd64"
magefiles.Global.Arch = "amd64"
} }


func All() error { func All() error {


+ 2
- 1
scanner/internal/event/event.go View File

@@ -8,6 +8,7 @@ import (
event "gitlink.org.cn/cloudream/common/pkgs/event" event "gitlink.org.cn/cloudream/common/pkgs/event"
"gitlink.org.cn/cloudream/common/pkgs/typedispatcher" "gitlink.org.cn/cloudream/common/pkgs/typedispatcher"
mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
) )


type ExecuteArgs struct { type ExecuteArgs struct {
@@ -32,7 +33,7 @@ func NewExecutor(db *mydb.DB, distLock *distlocksvc.Service) Executor {


var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]() var msgDispatcher = typedispatcher.NewTypeDispatcher[Event]()


func FromMessage(msg any) (Event, error) {
func FromMessage(msg scevt.Event) (Event, error) {
event, ok := msgDispatcher.Dispatch(msg) event, ok := msgDispatcher.Dispatch(msg)
if !ok { if !ok {
return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name()) return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name())


+ 1
- 9
scanner/internal/services/event.go View File

@@ -3,19 +3,11 @@ package services
import ( import (
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner"
scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event"
"gitlink.org.cn/cloudream/storage/scanner/internal/event" "gitlink.org.cn/cloudream/storage/scanner/internal/event"
) )


func (svc *Service) PostEvent(msg *scmq.PostEvent) { func (svc *Service) PostEvent(msg *scmq.PostEvent) {

evtMsg, err := scevt.MapToMessage(msg.Event)
if err != nil {
logger.Warnf("convert map to event message failed, err: %s", err.Error())
return
}

evt, err := event.FromMessage(evtMsg)
evt, err := event.FromMessage(msg.Event)
if err != nil { if err != nil {
logger.Warnf("create event from event message failed, err: %s", err.Error()) logger.Warnf("create event from event message failed, err: %s", err.Error())
return return


Loading…
Cancel
Save