package machinery import ( "errors" "fmt" "os" "os/signal" "syscall" "time" "github.com/RichardKnop/machinery/v1/backends/amqp" "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/retry" "github.com/RichardKnop/machinery/v1/tasks" "github.com/RichardKnop/machinery/v1/tracing" "github.com/opentracing/opentracing-go" ) // Worker represents a single worker process type Worker struct { server *Server ConsumerTag string Concurrency int Queue string errorHandler func(err error) preTaskHandler func(*tasks.Signature) postTaskHandler func(*tasks.Signature) } // Launch starts a new worker process. The worker subscribes // to the default queue and processes incoming registered tasks func (worker *Worker) Launch() error { errorsChan := make(chan error) worker.LaunchAsync(errorsChan) return <-errorsChan } // LaunchAsync is a non blocking version of Launch func (worker *Worker) LaunchAsync(errorsChan chan<- error) { cnf := worker.server.GetConfig() broker := worker.server.GetBroker() // Log some useful information about worker configuration log.INFO.Printf("Launching a worker with the following settings:") log.INFO.Printf("- Broker: %s", cnf.Broker) if worker.Queue == "" { log.INFO.Printf("- DefaultQueue: %s", cnf.DefaultQueue) } else { log.INFO.Printf("- CustomQueue: %s", worker.Queue) } log.INFO.Printf("- ResultBackend: %s", cnf.ResultBackend) if cnf.AMQP != nil { log.INFO.Printf("- AMQP: %s", cnf.AMQP.Exchange) log.INFO.Printf(" - Exchange: %s", cnf.AMQP.Exchange) log.INFO.Printf(" - ExchangeType: %s", cnf.AMQP.ExchangeType) log.INFO.Printf(" - BindingKey: %s", cnf.AMQP.BindingKey) log.INFO.Printf(" - PrefetchCount: %d", cnf.AMQP.PrefetchCount) } // Goroutine to start broker consumption and handle retries when broker connection dies go func() { for { retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker) if retry { if worker.errorHandler != nil { worker.errorHandler(err) } else { log.WARNING.Printf("Broker failed with error: %s", err) } } else { errorsChan <- err // stop the goroutine return } } }() if !cnf.NoUnixSignals { sig := make(chan os.Signal, 1) signal.Notify(sig, os.Interrupt, syscall.SIGTERM) var signalsReceived uint // Goroutine Handle SIGINT and SIGTERM signals go func() { for { select { case s := <-sig: log.WARNING.Printf("Signal received: %v", s) signalsReceived++ if signalsReceived < 2 { // After first Ctrl+C start quitting the worker gracefully log.WARNING.Print("Waiting for running tasks to finish before shutting down") go func() { worker.Quit() errorsChan <- errors.New("Worker quit gracefully") }() } else { // Abort the program when user hits Ctrl+C second time in a row errorsChan <- errors.New("Worker quit abruptly") } } } }() } } // CustomQueue returns Custom Queue of the running worker process func (worker *Worker) CustomQueue() string { return worker.Queue } // Quit tears down the running worker process func (worker *Worker) Quit() { worker.server.GetBroker().StopConsuming() } // Process handles received tasks and triggers success/error callbacks func (worker *Worker) Process(signature *tasks.Signature) error { // If the task is not registered with this worker, do not continue // but only return nil as we do not want to restart the worker process if !worker.server.IsTaskRegistered(signature.Name) { return nil } taskFunc, err := worker.server.GetRegisteredTask(signature.Name) if err != nil { return nil } // Update task state to RECEIVED if err = worker.server.GetBackend().SetStateReceived(signature); err != nil { return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err) } // Prepare task for processing task, err := tasks.NewWithSignature(taskFunc, signature) // if this failed, it means the task is malformed, probably has invalid // signature, go directly to task failed without checking whether to retry if err != nil { worker.taskFailed(signature, err) return err } // try to extract trace span from headers and add it to the function context // so it can be used inside the function if it has context.Context as the first // argument. Start a new span if it isn't found. taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name) tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature) task.Context = opentracing.ContextWithSpan(task.Context, taskSpan) // Update task state to STARTED if err = worker.server.GetBackend().SetStateStarted(signature); err != nil { return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err) } //Run handler before the task is called if worker.preTaskHandler != nil { worker.preTaskHandler(signature) } //Defer run handler for the end of the task if worker.postTaskHandler != nil { defer worker.postTaskHandler(signature) } // Call the task results, err := task.Call() if err != nil { // If a tasks.ErrRetryTaskLater was returned from the task, // retry the task after specified duration retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater) if ok { return worker.retryTaskIn(signature, retriableErr.RetryIn()) } // Otherwise, execute default retry logic based on signature.RetryCount // and signature.RetryTimeout values if signature.RetryCount > 0 { return worker.taskRetry(signature) } return worker.taskFailed(signature, err) } return worker.taskSucceeded(signature, results) } // retryTask decrements RetryCount counter and republishes the task to the queue func (worker *Worker) taskRetry(signature *tasks.Signature) error { // Update task state to RETRY if err := worker.server.GetBackend().SetStateRetry(signature); err != nil { return fmt.Errorf("Set state to 'retry' for task %s returned error: %s", signature.UUID, err) } // Decrement the retry counter, when it reaches 0, we won't retry again signature.RetryCount-- // Increase retry timeout signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout) // Delay task by signature.RetryTimeout seconds eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout)) signature.ETA = &eta log.WARNING.Printf("Task %s failed. Going to retry in %d seconds.", signature.UUID, signature.RetryTimeout) // Send the task back to the queue _, err := worker.server.SendTask(signature) return err } // taskRetryIn republishes the task to the queue with ETA of now + retryIn.Seconds() func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Duration) error { // Update task state to RETRY if err := worker.server.GetBackend().SetStateRetry(signature); err != nil { return fmt.Errorf("Set state to 'retry' for task %s returned error: %s", signature.UUID, err) } // Delay task by retryIn duration eta := time.Now().UTC().Add(retryIn) signature.ETA = &eta log.WARNING.Printf("Task %s failed. Going to retry in %.0f seconds.", signature.UUID, retryIn.Seconds()) // Send the task back to the queue _, err := worker.server.SendTask(signature) return err } // taskSucceeded updates the task state and triggers success callbacks or a // chord callback if this was the last task of a group with a chord callback func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error { // Update task state to SUCCESS if err := worker.server.GetBackend().SetStateSuccess(signature, taskResults); err != nil { return fmt.Errorf("Set state to 'success' for task %s returned error: %s", signature.UUID, err) } // Log human readable results of the processed task var debugResults = "[]" results, err := tasks.ReflectTaskResults(taskResults) if err != nil { log.WARNING.Print(err) } else { debugResults = tasks.HumanReadableResults(results) } log.DEBUG.Printf("Processed task %s. Results = %s", signature.UUID, debugResults) // Trigger success callbacks for _, successTask := range signature.OnSuccess { if signature.Immutable == false { // Pass results of the task to success callbacks for _, taskResult := range taskResults { successTask.Args = append(successTask.Args, tasks.Arg{ Type: taskResult.Type, Value: taskResult.Value, }) } } worker.server.SendTask(successTask) } // If the task was not part of a group, just return if signature.GroupUUID == "" { return nil } // Check if all task in the group has completed groupCompleted, err := worker.server.GetBackend().GroupCompleted( signature.GroupUUID, signature.GroupTaskCount, ) if err != nil { return fmt.Errorf("Completed check for group %s returned error: %s", signature.GroupUUID, err) } // If the group has not yet completed, just return if !groupCompleted { return nil } // Defer purging of group meta queue if we are using AMQP backend if worker.hasAMQPBackend() { defer worker.server.GetBackend().PurgeGroupMeta(signature.GroupUUID) } // There is no chord callback, just return if signature.ChordCallback == nil { return nil } // Trigger chord callback shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID) if err != nil { return fmt.Errorf("Triggering chord for group %s returned error: %s", signature.GroupUUID, err) } // Chord has already been triggered if !shouldTrigger { return nil } // Get task states taskStates, err := worker.server.GetBackend().GroupTaskStates( signature.GroupUUID, signature.GroupTaskCount, ) if err != nil { return nil } // Append group tasks' return values to chord task if it's not immutable for _, taskState := range taskStates { if !taskState.IsSuccess() { return nil } if signature.ChordCallback.Immutable == false { // Pass results of the task to the chord callback for _, taskResult := range taskState.Results { signature.ChordCallback.Args = append(signature.ChordCallback.Args, tasks.Arg{ Type: taskResult.Type, Value: taskResult.Value, }) } } } // Send the chord task _, err = worker.server.SendTask(signature.ChordCallback) if err != nil { return err } return nil } // taskFailed updates the task state and triggers error callbacks func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error { // Update task state to FAILURE if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil { return fmt.Errorf("Set state to 'failure' for task %s returned error: %s", signature.UUID, err) } if worker.errorHandler != nil { worker.errorHandler(taskErr) } else { log.ERROR.Printf("Failed processing task %s. Error = %v", signature.UUID, taskErr) } // Trigger error callbacks for _, errorTask := range signature.OnError { // Pass error as a first argument to error callbacks args := append([]tasks.Arg{{ Type: "string", Value: taskErr.Error(), }}, errorTask.Args...) errorTask.Args = args worker.server.SendTask(errorTask) } return nil } // Returns true if the worker uses AMQP backend func (worker *Worker) hasAMQPBackend() bool { _, ok := worker.server.GetBackend().(*amqp.Backend) return ok } // SetErrorHandler sets a custom error handler for task errors // A default behavior is just to log the error after all the retry attempts fail func (worker *Worker) SetErrorHandler(handler func(err error)) { worker.errorHandler = handler } //SetPreTaskHandler sets a custom handler func before a job is started func (worker *Worker) SetPreTaskHandler(handler func(*tasks.Signature)) { worker.preTaskHandler = handler } //SetPostTaskHandler sets a custom handler for the end of a job func (worker *Worker) SetPostTaskHandler(handler func(*tasks.Signature)) { worker.postTaskHandler = handler } //GetServer returns server func (worker *Worker) GetServer() *Server { return worker.server }