From f4260734a1ed0ccfcfb145bc425cd84d47e75763 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 22 May 2023 16:51:37 +0800 Subject: [PATCH] =?UTF-8?q?Task=E6=94=B9=E5=90=8D=E6=88=90Event?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/check_state.go | 27 +++++++++ internal/event/event.go | 20 +++++++ internal/task/check_state.go | 27 --------- internal/task/executor.go | 105 ---------------------------------- internal/task/task.go | 6 -- 5 files changed, 47 insertions(+), 138 deletions(-) create mode 100644 internal/event/check_state.go create mode 100644 internal/event/event.go delete mode 100644 internal/task/check_state.go delete mode 100644 internal/task/executor.go delete mode 100644 internal/task/task.go diff --git a/internal/event/check_state.go b/internal/event/check_state.go new file mode 100644 index 0000000..abb64a8 --- /dev/null +++ b/internal/event/check_state.go @@ -0,0 +1,27 @@ +package event + +import ( + "gitlink.org.cn/cloudream/agent/internal/config" + "gitlink.org.cn/cloudream/common/consts" + scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner" + sctsk "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" +) + +type CheckState struct { +} + +func (t *CheckState) TryMerge(other Event) bool { + _, ok := other.(*CheckState) + return ok +} + +func (t *CheckState) Execute(execCtx ExecuteContext) { + ipfsStatus := consts.IPFS_STATUS_OK + + if execCtx.Args.IPFS.IsUp() { + ipfsStatus = consts.IPFS_STATUS_OK + } + + // 紧急任务 + execCtx.Args.Scanner.PostEvent(scmsg.NewPostEventBody(sctsk.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true)) +} diff --git a/internal/event/event.go b/internal/event/event.go new file mode 100644 index 0000000..9e4b246 --- /dev/null +++ b/internal/event/event.go @@ -0,0 +1,20 @@ +package event + +import ( + event "gitlink.org.cn/cloudream/common/pkg/event" + "gitlink.org.cn/cloudream/common/utils/ipfs" + mydb "gitlink.org.cn/cloudream/db" + sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner" +) + +type ExecuteArgs struct { + Scanner *sccli.ScannerClient + DB *mydb.DB + IPFS *ipfs.IPFS +} + +type Executor = event.Executor[ExecuteArgs] + +type ExecuteContext = event.ExecuteContext[ExecuteArgs] + +type Event = event.Event[ExecuteArgs] diff --git a/internal/task/check_state.go b/internal/task/check_state.go deleted file mode 100644 index 7bb12ed..0000000 --- a/internal/task/check_state.go +++ /dev/null @@ -1,27 +0,0 @@ -package task - -import ( - "gitlink.org.cn/cloudream/agent/internal/config" - "gitlink.org.cn/cloudream/common/consts" - scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner" - sctsk "gitlink.org.cn/cloudream/rabbitmq/message/scanner/task" -) - -type CheckStateTask struct { -} - -func (t *CheckStateTask) TryMerge(other Task) bool { - _, ok := other.(*CheckStateTask) - return ok -} - -func (t *CheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { - ipfsStatus := consts.IPFS_STATUS_OK - - if execCtx.IPFS.IsUp() { - ipfsStatus = consts.IPFS_STATUS_OK - } - - // 紧急任务 - execCtx.Scanner.PostTask(scmsg.NewPostTaskBody(sctsk.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true)) -} diff --git a/internal/task/executor.go b/internal/task/executor.go deleted file mode 100644 index 6d559cb..0000000 --- a/internal/task/executor.go +++ /dev/null @@ -1,105 +0,0 @@ -package task - -import ( - "context" - "sync" - - "github.com/zyedidia/generic/list" - "gitlink.org.cn/cloudream/common/utils/ipfs" - mydb "gitlink.org.cn/cloudream/db" - sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner" - "golang.org/x/sync/semaphore" -) - -type ExecuteOption struct { - IsEmergency bool - DontMerge bool -} -type ExecuteContext struct { - Executor *Executor - Scanner *sccli.ScannerClient - DB *mydb.DB - IPFS *ipfs.IPFS -} -type postedTask struct { - Task Task - Option ExecuteOption -} - -type Executor struct { - tasks list.List[postedTask] - locker sync.Mutex - taskSema semaphore.Weighted - execCtx *ExecuteContext -} - -func (e *Executor) Post(task Task, opts ...ExecuteOption) { - opt := ExecuteOption{ - IsEmergency: false, - DontMerge: false, - } - - if len(opts) > 0 { - opt = opts[0] - } - - e.locker.Lock() - defer e.locker.Unlock() - - // 紧急任务直接插入到队头,不进行合并 - if opt.IsEmergency { - e.tasks.PushFront(postedTask{ - Task: task, - Option: opt, - }) - e.taskSema.Release(1) - return - } - - // 合并任务 - if opt.DontMerge { - ptr := e.tasks.Front - for ptr != nil { - // 只与非紧急任务,且允许合并的任务进行合并 - if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge { - if ptr.Value.Task.TryMerge(task) { - return - } - } - - ptr = ptr.Next - } - } - - e.tasks.PushBack(postedTask{ - Task: task, - Option: opt, - }) - e.taskSema.Release(1) -} - -// Execute 开始执行任务 -func (e *Executor) Execute() error { - for { - // TODO 打印错误日志 - e.taskSema.Acquire(context.Background(), 1) - - task := e.popFrontTask() - if task == nil { - continue - } - - task.Task.Execute(e.execCtx, task.Option) - } -} - -func (e *Executor) popFrontTask() *postedTask { - e.locker.Lock() - defer e.locker.Unlock() - - if e.tasks.Front == nil { - return nil - } - - return &e.tasks.Front.Value -} diff --git a/internal/task/task.go b/internal/task/task.go deleted file mode 100644 index 40547df..0000000 --- a/internal/task/task.go +++ /dev/null @@ -1,6 +0,0 @@ -package task - -type Task interface { - TryMerge(other Task) bool // 尝试将other任务与自身合并,如果成功返回true - Execute(ctx *ExecuteContext, myOpts ExecuteOption) -}