|
- // Copyright 2025 The HuaTuo Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
-
- package tracing
-
- import (
- "context"
- "crypto/rand"
- "errors"
- "fmt"
- "math/big"
- "os/exec"
- "sync"
- "time"
-
- "huatuo-bamai/internal/log"
- "huatuo-bamai/internal/storage"
- )
-
- // Status represents the status of a task.
- type Status string
-
- const (
- // StatusCompleted represents a task that has finished executing successfully.
- StatusCompleted = "completed"
- // StatusFailed represents a task that encountered an error during execution.
- StatusFailed = "failed"
- // StatusPending represents a task that has been created but has not yet begun execution.
- StatusPending = "pending"
- // StatusRunning represents a task that is currently being executed.
- StatusRunning = "running"
- // StatusNotExist represents a task that has either never been created or has been removed.
- StatusNotExist = "not_exist"
- )
-
- type TaskStorageType int
-
- const (
- TaskStorageDB TaskStorageType = iota + 1
- TaskStorageStdout
- TaskStorageLocal
- )
-
- type TaskResult struct {
- TaskStatus Status
- TaskData []byte
- TaskErr error
- }
-
- // task represents a unit of work to be executed.
- type task struct {
- id string // Unique identifier for the task.
- execBinary string // Path to the executable file to run for this task.
- execArgs []string // Arguments to pass to the executable.
- stdoutData []byte // Data generated by the task.
- status Status // Current status of the task.
- error error // Error encountered during task execution.
- storage TaskStorageType // Type of data produced by the task.
- cancelFunc context.CancelFunc // Function to cancel the task.
- deadlineTime time.Time // Time after which the task will be automatically deleted.
- }
-
- var (
- taskLifeTmpCache sync.Map
- // ErrTaskNotFound Error returned when a task is not found.
- ErrTaskNotFound = errors.New("task not found")
- // ErrTaskTimeout Error returned when a task times out.
- ErrTaskTimeout = errors.New("task timeout")
- // ErrTaskCanceled Error returned when a task is canceled.
- ErrTaskCanceled = errors.New("task canceled")
- )
-
- func init() {
- go tasksGarbageCollect()
- }
-
- func tasksGarbageCollect() {
- ticker := time.NewTicker(time.Second * 10)
- defer ticker.Stop()
-
- for range ticker.C {
- now := time.Now()
- taskLifeTmpCache.Range(func(key, value any) bool {
- task := value.(*task)
- if task.status == StatusCompleted || task.status == StatusFailed {
- if now.After(task.deadlineTime) {
- log.Infof("task %s deleted by timeout", key)
- taskLifeTmpCache.Delete(key)
- }
- }
- return true
- })
- }
- }
-
- func allocTaskID() string {
- const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
- const length = 16
- result := make([]byte, length)
- charsetLength := big.NewInt(int64(len(charset)))
-
- for i := range result {
- num, err := rand.Int(rand.Reader, charsetLength)
- if err != nil {
- panic("Failed to generate random number")
- }
- result[i] = charset[num.Int64()]
- }
-
- return string(result)
- }
-
- // NewTask creates a new task, allocates an ID, and starts it.
- func NewTask(execBinary string, timeout time.Duration, storageType TaskStorageType, execArgs []string) string {
- taskID := allocTaskID()
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- task := &task{
- id: taskID,
- status: StatusPending,
- cancelFunc: cancel,
- execBinary: execBinary,
- storage: storageType,
- execArgs: execArgs,
- }
- taskLifeTmpCache.Store(taskID, task)
-
- go runTask(ctx, task)
-
- return taskID
- }
-
- func runTask(ctx context.Context, task *task) {
- defer func() {
- setDeadlineDefault(task)
- }()
-
- task.status = StatusRunning
- log.Infof("task %s %s started", task.execBinary, task.id)
-
- cmd := exec.CommandContext(ctx, task.execBinary, task.execArgs...)
- output, err := cmd.CombinedOutput()
- if err != nil {
- task.status = StatusFailed
- contextErr := ctx.Err()
- if errors.Is(contextErr, context.DeadlineExceeded) {
- task.error = ErrTaskTimeout
- } else if errors.Is(contextErr, context.Canceled) {
- task.error = ErrTaskCanceled
- } else {
- task.error = fmt.Errorf("task error: %s| cmd error: %s", err.Error(), string(output))
- }
- log.Infof("task %s %s failed: %s", task.execBinary, task.id, task.error.Error())
- return
- }
-
- switch task.storage {
- case TaskStorageDB:
- storage.SaveTaskOutput(task.execBinary, task.id, "", time.Now(), string(output))
- case TaskStorageStdout:
- task.stdoutData = output
- case TaskStorageLocal:
- default:
- log.Warn("not supported")
- }
-
- task.status = StatusCompleted
- log.Infof("task %s completed: %s", task.id, fmt.Sprint(task.execBinary, task.execArgs))
- }
-
- func setDeadlineDefault(task *task) {
- task.deadlineTime = time.Now().Add(10 * time.Minute)
- }
-
- // RunningTaskCount gets the number of running tasks.
- func RunningTaskCount() int {
- count := 0
- taskLifeTmpCache.Range(func(key, value any) bool {
- task := value.(*task)
- if task.status == StatusRunning {
- count++
- }
- return true
- })
- return count
- }
-
- // Result returns the result of a task given its ID.
- func Result(taskID string) *TaskResult {
- taskInterface, ok := taskLifeTmpCache.Load(taskID)
- if !ok {
- return &TaskResult{
- TaskStatus: StatusNotExist,
- TaskErr: ErrTaskNotFound,
- }
- }
-
- task := taskInterface.(*task)
- if task.status == StatusFailed || task.status == StatusCompleted {
- setDeadlineDefault(task)
- }
- return &TaskResult{
- TaskData: task.stdoutData,
- TaskStatus: task.status,
- TaskErr: task.error,
- }
- }
-
- // StopTask stops a running task given its ID.
- func StopTask(taskID string) error {
- taskAny, ok := taskLifeTmpCache.Load(taskID)
- if !ok {
- return ErrTaskNotFound
- }
-
- task := taskAny.(*task)
- if task.status == StatusRunning {
- task.cancelFunc()
- }
- taskLifeTmpCache.Delete(taskID)
- log.Infof("task %s stoped", task.id)
- return nil
- }
|