Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Resilient Data Pipelines with Tasker

How DAG workflows and parallel execution turn brittle ETL scripts into observable, self-healing pipelines.

The Problem

Your analytics pipeline runs nightly. It pulls sales data from your database, inventory snapshots from the warehouse system, and customer records from the CRM. Then it transforms each dataset, aggregates everything into a unified view, and generates business insights. Eight steps, chained together in a cron job.

When the warehouse API returns a 503 at 2 AM, the entire pipeline fails. Your data team discovers the gap the next morning when dashboards show stale numbers. They re-run the whole pipeline manually, even though the sales and customer extracts completed successfully the first time. The warehouse API is back up now, but you’ve lost hours of freshness and burned compute re-extracting data you already had.

The root issue isn’t the API failure — transient errors happen. The issue is that your pipeline treats independent data sources as a sequential chain, so one failure poisons everything downstream.

The Fragile Approach

A typical ETL pipeline chains everything sequentially:

def run_pipeline(config):
    sales = extract_sales(config.source)         # 1. blocks on completion
    inventory = extract_inventory(config.warehouse)  # 2. waits for sales (why?)
    customers = extract_customers(config.crm)     # 3. waits for inventory (why?)
    sales_t = transform_sales(sales)
    inventory_t = transform_inventory(inventory)
    customers_t = transform_customers(customers)
    metrics = aggregate(sales_t, inventory_t, customers_t)
    return generate_insights(metrics)

The three extract steps have no data dependency on each other, yet they run sequentially because the code is sequential. If extract #2 fails, extract #3 never starts. And there’s no retry — a single transient failure aborts the whole run.

The Tasker Approach

Tasker models this pipeline as a DAG (directed acyclic graph). Steps that don’t depend on each other run in parallel automatically. Steps that need upstream results wait only for their specific dependencies.

Task Template (YAML)

name: analytics_pipeline
namespace_name: data_pipeline
version: 1.0.0
description: "Analytics ETL pipeline with parallel extraction and aggregation"

steps:
  # EXTRACT PHASE — 3 parallel steps (no dependencies)
  - name: extract_sales_data
    description: "Extract sales records from database"
    handler:
      callable: extract_sales_data
    dependencies: []
    retry:
      retryable: true
      max_attempts: 3
      backoff: exponential
      initial_delay: 2
      max_delay: 30

  - name: extract_inventory_data
    description: "Extract inventory records from warehouse system"
    handler:
      callable: extract_inventory_data
    dependencies: []
    retry:
      retryable: true
      max_attempts: 3
      backoff: exponential
      initial_delay: 2
      max_delay: 30

  - name: extract_customer_data
    description: "Extract customer records from CRM"
    handler:
      callable: extract_customer_data
    dependencies: []
    retry:
      retryable: true
      max_attempts: 3
      backoff: exponential
      initial_delay: 2
      max_delay: 30

  # TRANSFORM PHASE — each depends only on its own extract
  - name: transform_sales
    handler:
      callable: transform_sales
    dependencies:
      - extract_sales_data
    retry:
      retryable: true
      max_attempts: 2

  - name: transform_inventory
    handler:
      callable: transform_inventory
    dependencies:
      - extract_inventory_data
    retry:
      retryable: true
      max_attempts: 2

  - name: transform_customers
    handler:
      callable: transform_customers
    dependencies:
      - extract_customer_data
    retry:
      retryable: true
      max_attempts: 2

  # AGGREGATE PHASE — waits for ALL 3 transforms (DAG convergence)
  - name: aggregate_metrics
    handler:
      callable: aggregate_metrics
    dependencies:
      - transform_sales
      - transform_inventory
      - transform_customers
    retry:
      retryable: true
      max_attempts: 2

  # INSIGHTS PHASE — depends on aggregation
  - name: generate_insights
    handler:
      callable: generate_insights
    dependencies:
      - aggregate_metrics
    retry:
      retryable: true
      max_attempts: 2

The DAG structure is visible in the dependencies field:

extract_sales ──→ transform_sales ──────┐
extract_inventory → transform_inventory ─┼─→ aggregate_metrics → generate_insights
extract_customer ─→ transform_customers ─┘

All three extract steps have dependencies: [], so Tasker runs them concurrently. Each transform depends only on its own extract, so transforms also run in parallel (once their extract completes). The aggregate step waits for all three transforms — this is the convergence point where parallel branches rejoin.

Full template: data_pipeline_analytics_pipeline.yaml

Step Handlers

ExtractSalesDataHandler — Parallel Root Step

Extract steps are “root” steps with no dependencies. They run immediately when the task starts, concurrently with other root steps.

Python (FastAPI)

class ExtractSalesDataHandler(StepHandler):
    handler_name = "extract_sales_data"
    handler_version = "1.0.0"

    PRODUCT_CATEGORIES = ["electronics", "clothing", "food", "home", "sports"]
    REGIONS = ["us-east", "us-west", "eu-central", "ap-southeast"]

    def call(self, context: StepContext) -> StepHandlerResult:
        source = context.get_input("source") or "default"
        date_start = context.get_input("date_range_start") or "2026-01-01"
        date_end = context.get_input("date_range_end") or "2026-01-31"

        records = []
        for i in range(30):
            category = random.choice(self.PRODUCT_CATEGORIES)
            region = random.choice(self.REGIONS)
            quantity = random.randint(1, 50)
            unit_price = round(random.uniform(5.0, 500.0), 2)
            revenue = round(quantity * unit_price, 2)

            records.append({
                "record_id": f"sale_{uuid.uuid4().hex[:10]}",
                "category": category,
                "region": region,
                "quantity": quantity,
                "unit_price": unit_price,
                "revenue": revenue,
            })

        total_revenue = round(sum(r["revenue"] for r in records), 2)

        return StepHandlerResult.success(
            result={
                "source": "sales_database",
                "record_count": len(records),
                "records": records,
                "total_revenue": total_revenue,
                "date_range": {"start": date_start, "end": date_end},
                "extracted_at": datetime.now(timezone.utc).isoformat(),
            },
        )

TypeScript (Bun/Hono)

export class ExtractSalesDataHandler extends StepHandler {
  static handlerName = 'DataPipeline.StepHandlers.ExtractSalesDataHandler';

  async call(context: StepContext): Promise<StepHandlerResult> {
    const dateRange = context.getInput<{ start: string; end: string }>('date_range');

    const recordCount = Math.floor(Math.random() * 500) + 100;
    const records = generateSalesRecords(recordCount);
    const totalRevenue = records.reduce((sum, r) => sum + r.value, 0);

    return this.success({
      records,
      extracted_at: new Date().toISOString(),
      source: 'SalesDatabase',
      total_amount: Math.round(totalRevenue * 100) / 100,
      record_count: recordCount,
    });
  }
}

The important detail: this handler has no get_dependency_result() calls. It reads only from the task’s initial input via get_input(). The orchestrator knows it can run this step immediately, in parallel with the other two extract steps.

Full implementations: FastAPI | Bun/Hono

AggregateMetricsHandler — Multi-Dependency Convergence

The aggregate step is the convergence point. It depends on all three transform steps and pulls results from each one.

Python (FastAPI)

class AggregateMetricsHandler(StepHandler):
    handler_name = "aggregate_metrics"
    handler_version = "1.0.0"

    def call(self, context: StepContext) -> StepHandlerResult:
        sales_transform = context.get_dependency_result("transform_sales")
        traffic_transform = context.get_dependency_result("transform_inventory")
        inventory_transform = context.get_dependency_result("transform_customers")

        if not all([sales_transform, traffic_transform, inventory_transform]):
            return StepHandlerResult.failure(
                message="Missing one or more transform dependency results",
                error_type=ErrorType.HANDLER_ERROR,
                retryable=False,
            )

        total_revenue = sales_transform.get("total_revenue", 0)
        total_inventory = traffic_transform.get("total_quantity_on_hand", 0)
        total_customers = inventory_transform.get("record_count", 0)

        revenue_per_customer = (
            round(total_revenue / total_customers, 2) if total_customers > 0 else 0
        )
        inventory_turnover = (
            round(total_revenue / total_inventory, 4) if total_inventory > 0 else 0
        )

        return StepHandlerResult.success(
            result={
                "total_revenue": total_revenue,
                "total_inventory_quantity": total_inventory,
                "total_customers": total_customers,
                "revenue_per_customer": revenue_per_customer,
                "inventory_turnover_indicator": inventory_turnover,
                "aggregation_complete": True,
                "sources_included": 3,
                "aggregated_at": datetime.now(timezone.utc).isoformat(),
            },
        )

This handler calls get_dependency_result() three times — once for each upstream transform. The orchestrator guarantees all three have completed successfully before this step runs. If any transform failed (after exhausting its retries), this step never executes.

Full implementation: FastAPI | Bun/Hono

Creating a Task

Submitting the pipeline follows the same pattern as any Tasker workflow:

from tasker_core import TaskerClient

client = TaskerClient()
task = client.create_task(
    name="analytics_pipeline",
    namespace="data_pipeline",
    context={
        "source": "production",
        "date_range_start": "2026-01-01",
        "date_range_end": "2026-01-31",
        "granularity": "daily",
    },
)

Key Concepts

  • Parallel steps via empty dependencies: Steps with dependencies: [] are root steps that run concurrently. No threading code, no async coordination — the orchestrator handles it.
  • DAG convergence: A step that depends on multiple upstream steps waits for all of them. The aggregate_metrics step converges three parallel branches into one.
  • Multi-dependency access: get_dependency_result() retrieves the complete result from any named upstream step. The handler doesn’t need to know whether that step ran in parallel or sequentially.
  • Retry with backoff: Each step configures its own retry policy. The extract steps use 3 attempts with exponential backoff because external systems have transient failures. Transform steps use 2 attempts because they’re CPU-bound and unlikely to benefit from retrying.

Full Implementations

The complete analytics pipeline is implemented in all four supported languages:

What’s Next

Parallel extraction is powerful, but real-world workflows often have a diamond pattern — a step that fans out to parallel branches that must converge before continuing. In Post 03: Microservices Coordination, we’ll build a user registration workflow where account creation fans out to billing and preferences setup in parallel, then converges for the welcome sequence — demonstrating how Tasker replaces custom circuit breakers with declarative dependency management.


See this pattern implemented in all four frameworks on the Example Apps page.