Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Your First Handler

This guide walks you through writing your first step handler.

What is a Handler?

A Step Handler is a class (or function, in Rust) that executes business logic for a single workflow step. Handlers:

  • Receive a StepContext with input data, dependency results, and configuration
  • Perform operations (API calls, database queries, calculations)
  • Return a success or failure result for downstream steps

You can generate a handler from a template with tasker-ctl:

tasker-ctl template generate step_handler --language python --param name=ProcessOrder

Or write one from scratch using the patterns below.

Handler Anatomy by Language

Python

from tasker_core import StepContext, StepHandler, StepHandlerResult


class ProcessOrderHandler(StepHandler):
    handler_name = "process_order"
    handler_version = "1.0.0"

    def call(self, context: StepContext) -> StepHandlerResult:
        # Access input data from the task context
        input_data = context.input_data

        # Access results from upstream dependency steps
        # prev_result = context.get_dependency_result("previous_step_name")

        result = {
            "processed": True,
            "handler": "process_order",
        }

        return StepHandlerResult.success(result=result)

Ruby

require 'tasker_core'

module Handlers
  class ProcessOrderHandler < TaskerCore::StepHandler::Base
    def call(context)
      # Access input data from the task context
      input = context.input_data

      # Access results from upstream dependency steps
      # prev_result = context.get_dependency_result('previous_step_name')

      result = {
        processed: true,
        handler: 'process_order'
      }

      success(result: result)
    rescue StandardError => e
      failure(
        message: e.message,
        error_type: 'RetryableError',
        retryable: true,
        metadata: { handler: 'process_order' }
      )
    end
  end
end

TypeScript

import {
  StepHandler,
  type StepContext,
  type StepHandlerResult,
  ErrorType,
} from '@tasker-systems/tasker';

export class ProcessOrderHandler extends StepHandler {
  static handlerName = 'process_order';
  static handlerVersion = '1.0.0';

  async call(context: StepContext): Promise<StepHandlerResult> {
    try {
      // Access input data from the task context
      const inputData = context.inputData;

      // Access results from upstream dependency steps
      // const prevResult = context.getDependencyResult('previous_step_name');

      const result = {
        processed: true,
        handler: 'process_order',
      };

      return this.success(result);
    } catch (error) {
      return this.failure(
        error instanceof Error ? error.message : String(error),
        ErrorType.RETRYABLE_ERROR,
        true,
      );
    }
  }
}

Rust

#![allow(unused)]
fn main() {
use anyhow::Result;
use async_trait::async_trait;
use serde_json::json;
use std::time::Instant;
use tasker_shared::messaging::StepExecutionResult;
use tasker_shared::types::TaskSequenceStep;
use tasker_worker_rust::{success_result, RustStepHandler};
use tasker_worker_rust::step_handlers::StepHandlerConfig;

pub struct ProcessOrderHandler {
    config: StepHandlerConfig,
}

#[async_trait]
impl RustStepHandler for ProcessOrderHandler {
    fn new(config: StepHandlerConfig) -> Self {
        Self { config }
    }

    fn name(&self) -> &str {
        "process_order"
    }

    async fn call(
        &self,
        step_data: &TaskSequenceStep,
    ) -> Result<StepExecutionResult> {
        let start = Instant::now();

        // Access input data from the task context
        let _input_data = &step_data.task.context;

        // Access dependency results from upstream steps
        // let _dep_results = &step_data.dependency_results;

        let result_data = json!({
            "processed": true,
            "handler": "process_order"
        });

        let duration_ms = start.elapsed().as_millis() as i64;

        Ok(success_result(
            step_data.workflow_step.workflow_step_uuid,
            result_data,
            duration_ms,
            None,
        ))
    }
}
}

Key Patterns

ConceptPythonRubyTypeScriptRust
Input datacontext.input_datacontext.input_datacontext.inputDatastep_data.task.context
Dependency resultcontext.get_dependency_result("step")context.get_dependency_result('step')context.getDependencyResult('step')step_data.dependency_results
SuccessStepHandlerResult.success(result=data)success(result: data)this.success(data)Ok(success_result(...))
FailureStepHandlerResult.failure(...)failure(message:, ...)this.failure(msg, type, retryable)Ok(error_result(...))

Registering Handlers

Handlers are resolved by matching the handler.callable field in task template YAML. The callable format varies by language:

LanguageFormatExample
RubyModule::ClassNameHandlers::ProcessOrderHandler
Pythonmodule.file.ClassNamehandlers.process_order_handler.ProcessOrderHandler
TypeScriptClassNameProcessOrderHandler
Rustfunction_nameprocess_order

See It in Action

The example apps implement step handlers for four real-world workflows in all four languages. Compare the same handler across Rails, FastAPI, Bun, and Axum to see how each framework’s idioms map to the Tasker contract.

Next Steps