Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

StepContext API Reference

StepContext is the primary data access object for step handlers across all languages in the Tasker worker ecosystem. It provides a consistent interface for accessing task inputs, dependency results, configuration, and checkpoint data.

Overview

Every step handler receives a StepContext (or TaskSequenceStep in Rust) that contains:

  • Task context - Input data for the workflow (JSONB from task.context)
  • Dependency results - Results from upstream DAG steps
  • Step configuration - Handler-specific settings from the template
  • Checkpoint data - Batch processing state for resumability
  • Retry information - Current attempt count and max retries

Cross-Language API Reference

Core Data Access

OperationRustRubyPythonTypeScript
Get task inputget_input::<T>("key")?get_input("key")get_input("key")getInput("key")
Get input with defaultget_input_or("key", default)get_input_or("key", default)get_input_or("key", default)getInputOr("key", default)
Get config valueget_config::<T>("key")?get_config("key")get_config("key")getConfig("key")
Get dependency resultget_dependency_result_column_value::<T>("step")?get_dependency_result("step")get_dependency_result("step")getDependencyResult("step")
Get nested dependency fieldget_dependency_field::<T>("step", &["path"])?get_dependency_field("step", *path)get_dependency_field("step", *path)getDependencyField("step", ...path)

Retry Helpers

OperationRustRubyPythonTypeScript
Check if retryis_retry()is_retry?is_retry()isRetry()
Check if last retryis_last_retry()is_last_retry?is_last_retry()isLastRetry()
Get retry countretry_count()retry_countretry_countretryCount
Get max retriesmax_retries()max_retriesmax_retriesmaxRetries

Checkpoint Access

OperationRustRubyPythonTypeScript
Get raw checkpointcheckpoint()checkpointcheckpointcheckpoint
Get cursorcheckpoint_cursor::<T>()checkpoint_cursorcheckpoint_cursorcheckpointCursor
Get items processedcheckpoint_items_processed()checkpoint_items_processedcheckpoint_items_processedcheckpointItemsProcessed
Get accumulated resultsaccumulated_results::<T>()accumulated_resultsaccumulated_resultsaccumulatedResults
Check has checkpointhas_checkpoint()has_checkpoint?has_checkpoint()hasCheckpoint()

Standard Fields

FieldRustRubyPythonTypeScript
Task UUIDtask.task.task_uuidtask_uuidtask_uuidtaskUuid
Step UUIDworkflow_step.workflow_step_uuidstep_uuidstep_uuidstepUuid
Correlation IDtask.task.correlation_idtask.correlation_idcorrelation_idcorrelationId
Input data (raw)task.task.contextinput_data / contextinput_datainputData
Step config (raw)step_definition.handler.initializationstep_configstep_configstepConfig

Usage Examples

Rust

#![allow(unused)]
fn main() {
use tasker_shared::types::base::TaskSequenceStep;

async fn call(&self, step_data: &TaskSequenceStep) -> Result<StepExecutionResult> {
    // Get task input
    let order_id: String = step_data.get_input("order_id")?;
    let batch_size: i32 = step_data.get_input_or("batch_size", 100);

    // Get config
    let api_url: String = step_data.get_config("api_url")?;

    // Get dependency result
    let validation_result: ValidationResult = step_data.get_dependency_result_column_value("validate")?;

    // Extract nested field from dependency
    let item_count: i32 = step_data.get_dependency_field("process", &["stats", "count"])?;

    // Check retry status
    if step_data.is_retry() {
        println!("Retry attempt {}", step_data.retry_count());
    }

    // Resume from checkpoint
    let cursor: Option<i64> = step_data.checkpoint_cursor();
    let start_from = cursor.unwrap_or(0);

    // ... handler logic ...
}
}

Ruby

def call(context)
  # Get task input
  order_id = context.get_input('order_id')
  batch_size = context.get_input_or('batch_size', 100)

  # Get config
  api_url = context.get_config('api_url')

  # Get dependency result
  validation_result = context.get_dependency_result('validate')

  # Extract nested field from dependency
  item_count = context.get_dependency_field('process', 'stats', 'count')

  # Check retry status
  if context.is_retry?
    logger.info("Retry attempt #{context.retry_count}")
  end

  # Resume from checkpoint
  start_from = context.checkpoint_cursor || 0

  # ... handler logic ...
end

Python

def call(self, context: StepContext) -> StepHandlerResult:
    # Get task input
    order_id = context.get_input("order_id")
    batch_size = context.get_input_or("batch_size", 100)

    # Get config
    api_url = context.get_config("api_url")

    # Get dependency result
    validation_result = context.get_dependency_result("validate")

    # Extract nested field from dependency
    item_count = context.get_dependency_field("process", "stats", "count")

    # Check retry status
    if context.is_retry():
        print(f"Retry attempt {context.retry_count}")

    # Resume from checkpoint
    start_from = context.checkpoint_cursor or 0

    # ... handler logic ...

TypeScript

async call(context: StepContext): Promise<StepHandlerResult> {
  // Get task input
  const orderId = context.getInput<string>('order_id');
  const batchSize = context.getInputOr('batch_size', 100);

  // Get config
  const apiUrl = context.getConfig<string>('api_url');

  // Get dependency result
  const validationResult = context.getDependencyResult('validate');

  // Extract nested field from dependency
  const itemCount = context.getDependencyField('process', 'stats', 'count');

  // Check retry status
  if (context.isRetry()) {
    console.log(`Retry attempt ${context.retryCount}`);
  }

  // Resume from checkpoint
  const startFrom = context.checkpointCursor ?? 0;

  // ... handler logic ...
}

Checkpoint Usage Guide

Checkpoints enable resumable batch processing. When a handler processes large datasets, it can save progress via checkpoints and resume from where it left off on retry.

Checkpoint Fields

  • cursor - Position marker (can be int, string, or object)
  • items_processed - Count of items completed
  • accumulated_results - Running totals or aggregated state

Reading Checkpoints

# Python example
def call(self, context: StepContext) -> StepHandlerResult:
    # Check if resuming from checkpoint
    if context.has_checkpoint():
        cursor = context.checkpoint_cursor
        items_done = context.checkpoint_items_processed
        totals = context.accumulated_results or {}
        print(f"Resuming from cursor {cursor}, {items_done} items done")
    else:
        cursor = 0
        items_done = 0
        totals = {}

    # Process from cursor position...

Writing Checkpoints

Checkpoints are written by including checkpoint data in the handler result metadata. See the batch processing documentation for details on the checkpoint yield pattern.

Notes

  • All accessor methods handle missing data gracefully (return None/null or use defaults)
  • Dependency results are automatically unwrapped from the {"result": value} envelope
  • Type conversion is handled automatically where supported (Rust, TypeScript generics)
  • Checkpoint data is persisted atomically by the CheckpointService