diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a8bb2e8 --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +OUTPUT_BINARY_NAME = "scanner" +OUTPUT_DIR_NAME = "scanner" + + +ASSETS_DIR_NAME = "assets" +BUILD_DIR = "../../build" + +build: + go build -o ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME} + @if [ -d ${ASSETS_DIR_NAME} ] && [ -n "`ls -A ${ASSETS_DIR_NAME}`" ] ;then \ + cp -r ${ASSETS_DIR_NAME}/* ${BUILD_DIR}/${OUTPUT_DIR_NAME}/; \ + fi + +clean: + rm -f ${BUILD_DIR}/${OUTPUT_DIR_NAME}/${OUTPUT_BINARY_NAME} \ No newline at end of file diff --git a/internal/event/agent_check_cache.go b/internal/event/agent_check_cache.go index c895bcc..942ec5a 100644 --- a/internal/event/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -11,7 +11,7 @@ import ( agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" ) type AgentCheckCache struct { @@ -84,7 +84,7 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { defer agentClient.Close() err = agentClient.PostEvent(agtmsg.NewPostEventBody( - agttsk.NewCheckCache(isComplete, caches), + agtevt.NewCheckCache(isComplete, caches), execCtx.Option.IsEmergency, // 继承本任务的执行选项 execCtx.Option.DontMerge)) if err != nil { diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index ec1e947..6df6bcc 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -11,7 +11,7 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) @@ -78,7 +78,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { defer agentClient.Close() // 紧急任务 - err = agentClient.PostEvent(agtmsg.NewPostEventBody(agttsk.NewCheckState(), true, true)) + err = agentClient.PostEvent(agtmsg.NewPostEventBody(agtevt.NewCheckState(), true, true)) if err != nil { logger.WithField("NodeID", nodeID).Warnf("request to agent failed, err: %s", err.Error()) } diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index 05a0705..05dbba1 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -10,7 +10,7 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" + agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) @@ -99,7 +99,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { defer agentClient.Close() err = agentClient.PostEvent(agtmsg.NewPostEventBody( - agttsk.NewCheckStorage(stg.Directory, isComplete, objects), + agtevt.NewCheckStorage(stg.Directory, isComplete, objects), execCtx.Option.IsEmergency, // 继承本任务的执行选项 execCtx.Option.DontMerge)) if err != nil { diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..f64ed98 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,43 @@ +package event + +import ( + "fmt" + "reflect" + + event "gitlink.org.cn/cloudream/common/pkg/event" + "gitlink.org.cn/cloudream/common/pkg/typedispatcher" + mydb "gitlink.org.cn/cloudream/db" +) + +type ExecuteArgs struct { + DB *mydb.DB +} + +type Executor = event.Executor[ExecuteArgs] + +type ExecuteContext = event.ExecuteContext[ExecuteArgs] + +type Event = event.Event[ExecuteArgs] + +type ExecuteOption = event.ExecuteOption + +func NewExecutor(db *mydb.DB) Executor { + return event.NewExecutor(ExecuteArgs{ + DB: db, + }) +} + +var msgDispatcher typedispatcher.TypeDispatcher[Event] + +func FromMessage(msg any) (Event, error) { + event, ok := msgDispatcher.Dispatch(msg) + if !ok { + return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name()) + } + + return event, nil +} + +func Register[T any](converter func(msg T) Event) { + typedispatcher.Add(msgDispatcher, converter) +} diff --git a/internal/event/task.go b/internal/event/task.go deleted file mode 100644 index a1bd1f5..0000000 --- a/internal/event/task.go +++ /dev/null @@ -1,16 +0,0 @@ -package event - -import ( - event "gitlink.org.cn/cloudream/common/pkg/event" - mydb "gitlink.org.cn/cloudream/db" -) - -type ExecuteArgs struct { - DB *mydb.DB -} - -type Executor = event.Executor[ExecuteArgs] - -type ExecuteContext = event.ExecuteContext[ExecuteArgs] - -type Event = event.Event[ExecuteArgs] diff --git a/internal/services/event.go b/internal/services/event.go new file mode 100644 index 0000000..3d57a42 --- /dev/null +++ b/internal/services/event.go @@ -0,0 +1,28 @@ +package services + +import ( + "gitlink.org.cn/cloudream/common/utils/logger" + scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner" + scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" + "gitlink.org.cn/cloudream/scanner/internal/event" +) + +func (svc *Service) PostEvent(msg *scmsg.PostEvent) { + + evtMsg, err := scevt.MapToMessage(msg.Body.Event.(map[string]any)) + if err != nil { + logger.Warnf("convert map to event message failed, err: %s", err.Error()) + return + } + + evt, err := event.FromMessage(evtMsg) + if err != nil { + logger.Warnf("create event from event message failed, err: %s", err.Error()) + return + } + + svc.eventExecutor.Post(evt, event.ExecuteOption{ + IsEmergency: msg.Body.IsEmergency, + DontMerge: msg.Body.DontMerge, + }) +} diff --git a/internal/services/service.go b/internal/services/service.go new file mode 100644 index 0000000..9c5af8a --- /dev/null +++ b/internal/services/service.go @@ -0,0 +1,15 @@ +package services + +import ( + "gitlink.org.cn/cloudream/scanner/internal/event" +) + +type Service struct { + eventExecutor *event.Executor +} + +func NewService(eventExecutor *event.Executor) *Service { + return &Service{ + eventExecutor: eventExecutor, + } +} diff --git a/main.go b/main.go index b9b5171..5bd1d0f 100644 --- a/main.go +++ b/main.go @@ -1,29 +1,78 @@ package main import ( + "fmt" + "os" + "sync" - //"path/filepath" - //"sync" - - "time" - //agentcaller "proto" - //"github.com/pborman/uuid" - //"github.com/streadway/amqp" - //"google.golang.org/grpc" + log "gitlink.org.cn/cloudream/common/utils/logger" + "gitlink.org.cn/cloudream/db" + scsvr "gitlink.org.cn/cloudream/rabbitmq/server/scanner" + "gitlink.org.cn/cloudream/scanner/internal/config" + "gitlink.org.cn/cloudream/scanner/internal/event" + "gitlink.org.cn/cloudream/scanner/internal/services" ) func main() { - for { - //jh:遍历对象副本表, - //-对于每一个rephash, - //--根据objectId查询对象表中的RepNum - //--查询缓存表中rephash对应的TempOrPin为false的nodeIp - //--如果查到的NodeIp数少于RepNum,需发起复制命令 - //jh:遍历对象编码块表, - //-对于每一个blockhash,获得其blockId、objectId、innerID - //--判断blockhash是否在ipfs网络中 - //--得到待修复object清单:记录下各个objectId对应的不在ipfs网络中的blockId、blockhash、innerID - //-查询待修复object清单中各个object的FileSize、EcName等,并发出修复命令 - time.Sleep(time.Minute * 5) + err := config.Init() + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = log.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) } + + db, err := db.NewDB(&config.Cfg().DB) + if err != nil { + log.Fatalf("new db failed, err: %s", err.Error()) + } + + wg := sync.WaitGroup{} + wg.Add(2) + + eventExecutor := event.NewExecutor(db) + go serveEventExecutor(&eventExecutor, &wg) + + agtSvr, err := scsvr.NewScannerServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) + if err != nil { + log.Fatalf("new agent server failed, err: %s", err.Error()) + } + agtSvr.OnError = func(err error) { + log.Warnf("agent server err: %s", err.Error()) + } + go serveScannerServer(agtSvr, &wg) + + wg.Wait() +} + +func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) { + log.Info("start serving event executor") + + err := executor.Execute() + + if err != nil { + log.Errorf("event executor stopped with error: %s", err.Error()) + } + + log.Info("event executor stopped") + + wg.Done() +} + +func serveScannerServer(server *scsvr.ScannerServer, wg *sync.WaitGroup) { + log.Info("start serving scanner server") + + err := server.Serve() + + if err != nil { + log.Errorf("scanner server stopped with error: %s", err.Error()) + } + + log.Info("scanner server stopped") + + wg.Done() }