Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Resilient Data Pipelines with Tasker

How DAG workflows and parallel execution turn brittle ETL scripts into observable, self-healing pipelines.

Handler examples use Python DSL syntax. See Class-Based Handlers for the class-based alternative. Full implementations in all four languages are linked at the bottom.

The Problem

Your analytics pipeline runs nightly. It pulls sales data from your database, inventory snapshots from the warehouse system, and customer records from the CRM. Then it transforms each dataset, aggregates everything into a unified view, and generates business insights. Eight steps, chained together in a cron job.

When the warehouse API returns a 503 at 2 AM, the entire pipeline fails. Your data team discovers the gap the next morning when dashboards show stale numbers. They re-run the whole pipeline manually, even though the sales and customer extracts completed successfully the first time. The warehouse API is back up now, but you’ve lost hours of freshness and burned compute re-extracting data you already had.

The root issue isn’t the API failure — transient errors happen. The issue is that your pipeline treats independent data sources as a sequential chain, so one failure poisons everything downstream.

The Fragile Approach

A typical ETL pipeline chains everything sequentially:

def run_pipeline(config):
    sales = extract_sales(config.source)         # 1. blocks on completion
    inventory = extract_inventory(config.warehouse)  # 2. waits for sales (why?)
    customers = extract_customers(config.crm)     # 3. waits for inventory (why?)
    sales_t = transform_sales(sales)
    inventory_t = transform_inventory(inventory)
    customers_t = transform_customers(customers)
    metrics = aggregate(sales_t, inventory_t, customers_t)
    return generate_insights(metrics)

The three extract steps have no data dependency on each other, yet they run sequentially because the code is sequential. If extract #2 fails, extract #3 never starts. And there’s no retry — a single transient failure aborts the whole run.

The Tasker Approach

Tasker models this pipeline as a DAG (directed acyclic graph). Steps that don’t depend on each other run in parallel automatically. Steps that need upstream results wait only for their specific dependencies.

Task Template (YAML)

name: analytics_pipeline
namespace_name: data_pipeline
version: 1.0.0
description: "Analytics ETL pipeline with parallel extraction and aggregation"

steps:
  # EXTRACT PHASE — 3 parallel steps (no dependencies)
  - name: extract_sales_data
    description: "Extract sales records from database"
    handler:
      callable: extract_sales_data
    dependencies: []
    retry:
      retryable: true
      max_attempts: 3
      backoff: exponential
      initial_delay: 2
      max_delay: 30

  - name: extract_inventory_data
    description: "Extract inventory records from warehouse system"
    handler:
      callable: extract_inventory_data
    dependencies: []
    retry:
      retryable: true
      max_attempts: 3
      backoff: exponential
      initial_delay: 2
      max_delay: 30

  - name: extract_customer_data
    description: "Extract customer records from CRM"
    handler:
      callable: extract_customer_data
    dependencies: []
    retry:
      retryable: true
      max_attempts: 3
      backoff: exponential
      initial_delay: 2
      max_delay: 30

  # TRANSFORM PHASE — each depends only on its own extract
  - name: transform_sales
    handler:
      callable: transform_sales
    dependencies:
      - extract_sales_data
    retry:
      retryable: true
      max_attempts: 2

  - name: transform_inventory
    handler:
      callable: transform_inventory
    dependencies:
      - extract_inventory_data
    retry:
      retryable: true
      max_attempts: 2

  - name: transform_customers
    handler:
      callable: transform_customers
    dependencies:
      - extract_customer_data
    retry:
      retryable: true
      max_attempts: 2

  # AGGREGATE PHASE — waits for ALL 3 transforms (DAG convergence)
  - name: aggregate_metrics
    handler:
      callable: aggregate_metrics
    dependencies:
      - transform_sales
      - transform_inventory
      - transform_customers
    retry:
      retryable: true
      max_attempts: 2

  # INSIGHTS PHASE — depends on aggregation
  - name: generate_insights
    handler:
      callable: generate_insights
    dependencies:
      - aggregate_metrics
    retry:
      retryable: true
      max_attempts: 2

The DAG structure is visible in the dependencies field:

extract_sales ──→ transform_sales ──────┐
extract_inventory → transform_inventory ─┼─→ aggregate_metrics → generate_insights
extract_customer ─→ transform_customers ─┘

All three extract steps have dependencies: [], so Tasker runs them concurrently. Each transform depends only on its own extract, so transforms also run in parallel (once their extract completes). The aggregate step waits for all three transforms — this is the convergence point where parallel branches rejoin.

Full template: data_pipeline_analytics_pipeline.yaml

Step Handlers

ExtractSalesDataHandler — Parallel Root Step

Extract steps are “root” steps with no dependencies. They run immediately when the task starts, concurrently with other root steps.

from tasker_core.step_handler.functional import step_handler, inputs
from app.services.types import DataPipelineInput
from app.services import data_pipeline as svc

@step_handler("extract_sales_data")
@inputs(DataPipelineInput)
def extract_sales_data(inputs: DataPipelineInput, context):
    return svc.extract_sales_data(
        source=inputs.source,
        date_range_start=inputs.date_range_start,
        date_range_end=inputs.date_range_end,
        granularity=inputs.granularity,
    )

The important detail: this handler uses only @inputs (no @depends_on). It reads from the task’s initial input — no upstream step results needed. The orchestrator knows it can run this step immediately, in parallel with the other two extract steps. The service function contains the actual data extraction logic.

Full implementations: FastAPI | Bun/Hono

AggregateMetricsHandler — Multi-Dependency Convergence

The aggregate step is the convergence point. It depends on all three transform steps and pulls typed results from each one via @depends_on.

from tasker_core.step_handler.functional import step_handler, depends_on
from app.services.types import (
    PipelineTransformSalesResult,
    PipelineTransformInventoryResult,
    PipelineTransformCustomersResult,
)
from app.services import data_pipeline as svc

@step_handler("aggregate_metrics")
@depends_on(
    sales_transform=("transform_sales", PipelineTransformSalesResult),
    inventory_transform=("transform_inventory", PipelineTransformInventoryResult),
    customers_transform=("transform_customers", PipelineTransformCustomersResult),
)
def aggregate_metrics(
    sales_transform: PipelineTransformSalesResult,
    inventory_transform: PipelineTransformInventoryResult,
    customers_transform: PipelineTransformCustomersResult,
    context,
):
    return svc.aggregate_metrics(
        sales_transform=sales_transform,
        inventory_transform=inventory_transform,
        customers_transform=customers_transform,
    )

Three @depends_on entries compose the function signature — each injects a typed result from an upstream transform step. The orchestrator guarantees all three have completed successfully before this step runs. If any transform failed (after exhausting its retries), this step never executes. The service function contains the cross-source aggregation logic.

Full implementations: FastAPI | Bun/Hono

Creating a Task

Submitting the pipeline follows the same pattern as any Tasker workflow:

from tasker_core import TaskerClient

client = TaskerClient()
task = client.create_task(
    name="analytics_pipeline",
    namespace="data_pipeline",
    context={
        "source": "production",
        "date_range_start": "2026-01-01",
        "date_range_end": "2026-01-31",
        "granularity": "daily",
    },
)

Key Concepts

  • Parallel steps via empty dependencies: Steps with dependencies: [] are root steps that run concurrently. No threading code, no async coordination — the orchestrator handles it.
  • DAG convergence: A step that depends on multiple upstream steps waits for all of them. The aggregate_metrics step converges three parallel branches into one.
  • Typed dependency injection: @depends_on injects typed upstream results directly into the function signature. The handler receives validated data — no manual get_dependency_result() calls or key lookups.
  • Retry with backoff: Each step configures its own retry policy. The extract steps use 3 attempts with exponential backoff because external systems have transient failures. Transform steps use 2 attempts because they’re CPU-bound and unlikely to benefit from retrying.

Full Implementations

The complete analytics pipeline is implemented in all four supported languages:

What’s Next

Parallel extraction is powerful, but real-world workflows often have a diamond pattern — a step that fans out to parallel branches that must converge before continuing. In Post 03: Microservices Coordination, we’ll build a user registration workflow where account creation fans out to billing and preferences setup in parallel, then converges for the welcome sequence — demonstrating how Tasker replaces custom circuit breakers with declarative dependency management.


See this pattern implemented in all four frameworks on the Example Apps page.