Browse Source

scanner和agent启动EventExecutor

gitlink
Sydonian 2 years ago
parent
commit
89d47f88ca
9 changed files with 176 additions and 42 deletions
  1. +15
    -0
      Makefile
  2. +2
    -2
      internal/event/agent_check_cache.go
  3. +2
    -2
      internal/event/agent_check_state.go
  4. +2
    -2
      internal/event/agent_check_storage.go
  5. +43
    -0
      internal/event/event.go
  6. +0
    -16
      internal/event/task.go
  7. +28
    -0
      internal/services/event.go
  8. +15
    -0
      internal/services/service.go
  9. +69
    -20
      main.go

+ 15
- 0
Makefile View File

@@ -0,0 +1,15 @@
OUTPUT_BINARY_NAME = "scanner"
OUTPUT_DIR_NAME = "scanner"


ASSETS_DIR_NAME = "assets"
BUILD_DIR = "../../build"

build:
go build -o ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME}
@if [ -d ${ASSETS_DIR_NAME} ] && [ -n "`ls -A ${ASSETS_DIR_NAME}`" ] ;then \
cp -r ${ASSETS_DIR_NAME}/* ${BUILD_DIR}/${OUTPUT_DIR_NAME}/; \
fi

clean:
rm -f ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME}

+ 2
- 2
internal/event/agent_check_cache.go View File

@@ -11,7 +11,7 @@ import (

agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
)

type AgentCheckCache struct {
@@ -84,7 +84,7 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) {
defer agentClient.Close()

err = agentClient.PostEvent(agtmsg.NewPostEventBody(
agttsk.NewCheckCache(isComplete, caches),
agtevt.NewCheckCache(isComplete, caches),
execCtx.Option.IsEmergency, // 继承本任务的执行选项
execCtx.Option.DontMerge))
if err != nil {


+ 2
- 2
internal/event/agent_check_state.go View File

@@ -11,7 +11,7 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
"gitlink.org.cn/cloudream/scanner/internal/config"
)

@@ -78,7 +78,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
defer agentClient.Close()

// 紧急任务
err = agentClient.PostEvent(agtmsg.NewPostEventBody(agttsk.NewCheckState(), true, true))
err = agentClient.PostEvent(agtmsg.NewPostEventBody(agtevt.NewCheckState(), true, true))
if err != nil {
logger.WithField("NodeID", nodeID).Warnf("request to agent failed, err: %s", err.Error())
}


+ 2
- 2
internal/event/agent_check_storage.go View File

@@ -10,7 +10,7 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
"gitlink.org.cn/cloudream/scanner/internal/config"
)

@@ -99,7 +99,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
defer agentClient.Close()

err = agentClient.PostEvent(agtmsg.NewPostEventBody(
agttsk.NewCheckStorage(stg.Directory, isComplete, objects),
agtevt.NewCheckStorage(stg.Directory, isComplete, objects),
execCtx.Option.IsEmergency, // 继承本任务的执行选项
execCtx.Option.DontMerge))
if err != nil {


+ 43
- 0
internal/event/event.go View File

@@ -0,0 +1,43 @@
package event

import (
"fmt"
"reflect"

event "gitlink.org.cn/cloudream/common/pkg/event"
"gitlink.org.cn/cloudream/common/pkg/typedispatcher"
mydb "gitlink.org.cn/cloudream/db"
)

type ExecuteArgs struct {
DB *mydb.DB
}

type Executor = event.Executor[ExecuteArgs]

type ExecuteContext = event.ExecuteContext[ExecuteArgs]

type Event = event.Event[ExecuteArgs]

type ExecuteOption = event.ExecuteOption

func NewExecutor(db *mydb.DB) Executor {
return event.NewExecutor(ExecuteArgs{
DB: db,
})
}

var msgDispatcher typedispatcher.TypeDispatcher[Event]

func FromMessage(msg any) (Event, error) {
event, ok := msgDispatcher.Dispatch(msg)
if !ok {
return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name())
}

return event, nil
}

func Register[T any](converter func(msg T) Event) {
typedispatcher.Add(msgDispatcher, converter)
}

+ 0
- 16
internal/event/task.go View File

@@ -1,16 +0,0 @@
package event

import (
event "gitlink.org.cn/cloudream/common/pkg/event"
mydb "gitlink.org.cn/cloudream/db"
)

type ExecuteArgs struct {
DB *mydb.DB
}

type Executor = event.Executor[ExecuteArgs]

type ExecuteContext = event.ExecuteContext[ExecuteArgs]

type Event = event.Event[ExecuteArgs]

+ 28
- 0
internal/services/event.go View File

@@ -0,0 +1,28 @@
package services

import (
"gitlink.org.cn/cloudream/common/utils/logger"
scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
"gitlink.org.cn/cloudream/scanner/internal/event"
)

func (svc *Service) PostEvent(msg *scmsg.PostEvent) {

evtMsg, err := scevt.MapToMessage(msg.Body.Event.(map[string]any))
if err != nil {
logger.Warnf("convert map to event message failed, err: %s", err.Error())
return
}

evt, err := event.FromMessage(evtMsg)
if err != nil {
logger.Warnf("create event from event message failed, err: %s", err.Error())
return
}

svc.eventExecutor.Post(evt, event.ExecuteOption{
IsEmergency: msg.Body.IsEmergency,
DontMerge: msg.Body.DontMerge,
})
}

+ 15
- 0
internal/services/service.go View File

@@ -0,0 +1,15 @@
package services

import (
"gitlink.org.cn/cloudream/scanner/internal/event"
)

type Service struct {
eventExecutor *event.Executor
}

func NewService(eventExecutor *event.Executor) *Service {
return &Service{
eventExecutor: eventExecutor,
}
}

+ 69
- 20
main.go View File

@@ -1,29 +1,78 @@
package main

import (
"fmt"
"os"
"sync"

//"path/filepath"
//"sync"

"time"
//agentcaller "proto"
//"github.com/pborman/uuid"
//"github.com/streadway/amqp"
//"google.golang.org/grpc"
log "gitlink.org.cn/cloudream/common/utils/logger"
"gitlink.org.cn/cloudream/db"
scsvr "gitlink.org.cn/cloudream/rabbitmq/server/scanner"
"gitlink.org.cn/cloudream/scanner/internal/config"
"gitlink.org.cn/cloudream/scanner/internal/event"
"gitlink.org.cn/cloudream/scanner/internal/services"
)

func main() {
for {
//jh:遍历对象副本表,
//-对于每一个rephash,
//--根据objectId查询对象表中的RepNum
//--查询缓存表中rephash对应的TempOrPin为false的nodeIp
//--如果查到的NodeIp数少于RepNum,需发起复制命令
//jh:遍历对象编码块表,
//-对于每一个blockhash,获得其blockId、objectId、innerID
//--判断blockhash是否在ipfs网络中
//--得到待修复object清单:记录下各个objectId对应的不在ipfs网络中的blockId、blockhash、innerID
//-查询待修复object清单中各个object的FileSize、EcName等,并发出修复命令
time.Sleep(time.Minute * 5)
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = log.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

db, err := db.NewDB(&config.Cfg().DB)
if err != nil {
log.Fatalf("new db failed, err: %s", err.Error())
}

wg := sync.WaitGroup{}
wg.Add(2)

eventExecutor := event.NewExecutor(db)
go serveEventExecutor(&eventExecutor, &wg)

agtSvr, err := scsvr.NewScannerServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error())
}
agtSvr.OnError = func(err error) {
log.Warnf("agent server err: %s", err.Error())
}
go serveScannerServer(agtSvr, &wg)

wg.Wait()
}

func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) {
log.Info("start serving event executor")

err := executor.Execute()

if err != nil {
log.Errorf("event executor stopped with error: %s", err.Error())
}

log.Info("event executor stopped")

wg.Done()
}

func serveScannerServer(server *scsvr.ScannerServer, wg *sync.WaitGroup) {
log.Info("start serving scanner server")

err := server.Serve()

if err != nil {
log.Errorf("scanner server stopped with error: %s", err.Error())
}

log.Info("scanner server stopped")

wg.Done()
}

Loading…
Cancel
Save