Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Worker Crates: Common Patterns and Practices

Last Updated: 2026-01-06 Audience: Developers, Architects Status: Active Related Docs: Worker Event Systems | Worker Actors

<- Back to Worker Crates Overview


This document describes the common patterns and practices shared across all three worker implementations (Rust, Ruby, Python). Understanding these patterns helps developers write consistent handlers regardless of the language.

Table of Contents


Architectural Patterns

Dual-Channel Architecture

All workers implement a dual-channel architecture for non-blocking step execution:

┌─────────────────────────────────────────────────────────────────┐
│                    DUAL-CHANNEL PATTERN                         │
└─────────────────────────────────────────────────────────────────┘

    PostgreSQL PGMQ
          │
          ▼
  ┌───────────────────┐
  │  Dispatch Channel │  ──→  Step events flow TO handlers
  └───────────────────┘
          │
          ▼
  ┌───────────────────┐
  │  Handler Execution │  ──→  Business logic runs here
  └───────────────────┘
          │
          ▼
  ┌───────────────────┐
  │ Completion Channel │  ──→  Results flow BACK to orchestration
  └───────────────────┘
          │
          ▼
    Orchestration

Benefits:

  • Fire-and-forget dispatch (non-blocking)
  • Bounded concurrency via semaphores
  • Results processed independently from dispatch
  • Consistent pattern across all languages

Language-Specific Implementations

ComponentRustRubyPython
Dispatch Channelmpsc::channelpoll_step_events FFIpoll_step_events FFI
Completion Channelmpsc::channelcomplete_step_event FFIcomplete_step_event FFI
Concurrency ModelTokio async tasksRuby threads + FFI pollingPython threads + FFI polling
GIL HandlingN/APull-based pollingPull-based polling

Handler Lifecycle

Handler Registration

All implementations follow the same registration pattern:

1. Define handler class/struct
2. Set handler_name identifier
3. Register with HandlerRegistry
4. Handler ready for resolution

Ruby Example:

class ProcessOrderHandler < TaskerCore::StepHandler::Base
  def call(context)
    # Access data via cross-language standard methods
    order_id = context.get_task_field('order_id')

    # Business logic here...

    # Return result using base class helper (keyword args required)
    success(result: { order_id: order_id, status: 'processed' })
  end
end

# Registration
registry = TaskerCore::Registry::HandlerRegistry.instance
registry.register_handler('ProcessOrderHandler', ProcessOrderHandler)

Python Example:

from tasker_core import StepHandler, StepHandlerResult, HandlerRegistry

class ProcessOrderHandler(StepHandler):
    handler_name = "process_order"

    def call(self, context):
        order_id = context.input_data.get("order_id")
        return StepHandlerResult.success_handler_result(
            {"order_id": order_id, "status": "processed"}
        )

# Registration
registry = HandlerRegistry.instance()
registry.register("process_order", ProcessOrderHandler)

Handler Resolution Flow

1. Step event received with handler name
2. Registry.resolve(handler_name) called
3. Handler class instantiated
4. handler.call(context) invoked
5. Result returned to completion channel

Handler Context

All handlers receive a context object containing:

FieldDescription
task_uuidUnique identifier for the task
step_uuidUnique identifier for the step
input_dataTask context data passed to the step
dependency_resultsResults from parent/dependency steps
step_configConfiguration from step definition
step_inputsRuntime inputs from workflow_step.inputs
retry_countCurrent retry attempt number
max_retriesMaximum retry attempts allowed

Handler Results

All handlers return a structured result indicating success or failure. However, the APIs differ between Ruby and Python - this is a known design inconsistency that may be addressed in a future ticket.

Ruby - Uses keyword arguments and separate Success/Error types:

# Via base handler shortcuts
success(result: { key: "value" }, metadata: { duration_ms: 150 })

failure(
  message: "Something went wrong",
  error_type: "PermanentError",
  error_code: "VALIDATION_ERROR",  # Ruby has error_code field
  retryable: false,
  metadata: { field: "email" }
)

# Or via type factory methods
TaskerCore::Types::StepHandlerCallResult.success(result: { key: "value" })
TaskerCore::Types::StepHandlerCallResult.error(
  error_type: "PermanentError",
  message: "Error message",
  error_code: "ERR_001"
)

Python - Uses positional/keyword arguments and a single result type:

# Via base handler shortcuts
self.success(result={"key": "value"}, metadata={"duration_ms": 150})

self.failure(
    message="Something went wrong",
    error_type="ValidationError",  # Python has error_type only (no error_code)
    retryable=False,
    metadata={"field": "email"}
)

# Or via class factory methods
StepHandlerResult.success_handler_result(
    {"key": "value"},             # First positional arg is result
    {"duration_ms": 150}          # Second positional arg is metadata
)
StepHandlerResult.failure_handler_result(
    message="Something went wrong",
    error_type="ValidationError",
    retryable=False,
    metadata={"field": "email"}
)

Key Differences:

AspectRubyPython
Factory method names.success(), .error().success_handler_result(), .failure_handler_result()
Result typeSuccess / Error structsSingle StepHandlerResult class
Error code fielderror_code (freeform)Not present
Argument styleKeyword required (result:)Positional allowed

Error Handling

Error Classification

All workers classify errors into two categories:

TypeDescriptionBehavior
RetryableTransient errors that may succeed on retryStep re-enqueued up to max_retries
PermanentUnrecoverable errorsStep marked as failed immediately

HTTP Status Code Classification (ApiHandler)

400, 401, 403, 404, 422  →  Permanent Error (client errors)
429                       →  Retryable Error (rate limiting)
500-599                   →  Retryable Error (server errors)

Exception Hierarchy

Ruby:

TaskerCore::Error                  # Base class
├── TaskerCore::RetryableError     # Transient failures
├── TaskerCore::PermanentError     # Unrecoverable failures
├── TaskerCore::FFIError           # FFI bridge errors
└── TaskerCore::ConfigurationError # Configuration issues

Python:

TaskerError                        # Base class
├── WorkerNotInitializedError      # Worker not bootstrapped
├── WorkerBootstrapError           # Bootstrap failed
├── WorkerAlreadyRunningError      # Double initialization
├── FFIError                       # FFI bridge errors
├── ConversionError                # Type conversion errors
└── StepExecutionError             # Handler execution errors

Error Context Propagation

All errors should include context for debugging:

StepHandlerResult.failure_handler_result(
    message="Payment gateway timeout",
    error_type="gateway_timeout",
    retryable=True,
    metadata={
        "gateway": "stripe",
        "request_id": "req_xyz",
        "response_time_ms": 30000
    }
)

Polling Architecture

Why Polling?

Ruby and Python workers use a pull-based polling model due to language runtime constraints:

Ruby: The Global VM Lock (GVL) prevents Rust from safely calling Ruby methods from Rust threads. Polling allows Ruby to control thread context.

Python: The Global Interpreter Lock (GIL) has the same limitation. Python must initiate all cross-language calls.

Polling Characteristics

ParameterDefault ValueDescription
Poll Interval10msTime between polls when no events
Max Latency~10msTime from event generation to processing start
Starvation CheckEvery 100 polls (1 second)Detect processing bottlenecks
Cleanup IntervalEvery 1000 polls (10 seconds)Clean up timed-out events

Poll Loop Structure

while running:
    # 1. Poll for event
    event = poll_step_events()

    if event:
        # 2. Process event through handler
        process_event(event)
    else:
        # 3. Sleep when no events
        time.sleep(0.01)  # 10ms

    # 4. Periodic maintenance
    if poll_count % 100 == 0:
        check_starvation_warnings()

    if poll_count % 1000 == 0:
        cleanup_timeouts()

FFI Contract

Ruby and Python share the same FFI contract:

FunctionDescription
poll_step_events()Get next pending event (returns None if empty)
complete_step_event(event_id, result)Submit handler result
get_ffi_dispatch_metrics()Get dispatch channel metrics
check_starvation_warnings()Trigger starvation logging
cleanup_timeouts()Clean up timed-out events

Event Bridge Pattern

Overview

All workers implement an EventBridge (pub/sub) pattern for internal coordination:

┌─────────────────────────────────────────────────────────────────┐
│                      EVENT BRIDGE PATTERN                        │
└─────────────────────────────────────────────────────────────────┘

  Publishers                    EventBridge                 Subscribers
  ─────────                    ───────────                 ───────────
  HandlerRegistry  ──publish──→            ──notify──→  StepExecutionSubscriber
  EventPoller      ──publish──→  [Events]  ──notify──→  MetricsCollector
  Worker           ──publish──→            ──notify──→  Custom Subscribers

Standard Event Names

EventDescriptionPayload
handler_registeredHandler added to registry(name, handler_class)
step_execution_receivedStep event receivedFfiStepEvent
step_execution_completedHandler finishedStepHandlerResult
worker_startedWorker bootstrap completeworker_id
worker_stoppedWorker shutdownworker_id

Implementation Libraries

LanguageLibraryPattern
Rubydry-eventsPublisher/Subscriber
PythonpyeeEventEmitter
RustNative channelsmpsc

Usage Example (Python)

from tasker_core import EventBridge, EventNames

bridge = EventBridge.instance()

# Subscribe to events
def on_step_received(event):
    print(f"Processing step {event.step_uuid}")

bridge.subscribe(EventNames.STEP_EXECUTION_RECEIVED, on_step_received)

# Publish events
bridge.publish(EventNames.HANDLER_REGISTERED, "my_handler", MyHandler)

Singleton Pattern

Worker State Management

All workers store global state in a thread-safe singleton:

┌─────────────────────────────────────────────────────────────────┐
│                    SINGLETON WORKER STATE                        │
└─────────────────────────────────────────────────────────────────┘

    Thread-Safe Global
           │
           ▼
    ┌──────────────────┐
    │   WorkerSystem   │
    │  ┌────────────┐  │
    │  │ Mutex/Lock │  │
    │  │  Inner     │  │
    │  │  State     │  │
    │  └────────────┘  │
    └──────────────────┘
           │
           ├──→ HandlerRegistry
           ├──→ EventBridge
           ├──→ EventPoller
           └──→ Configuration

Singleton Classes

LanguageSingleton Implementation
RustOnceLock<Mutex<WorkerSystem>>
RubySingleton module
PythonClass-level _instance with instance() classmethod

Reset for Testing

All singletons provide reset methods for test isolation:

# Python
HandlerRegistry.reset_instance()
EventBridge.reset_instance()
# Ruby
TaskerCore::Registry::HandlerRegistry.reset_instance!

Observability

Health Checks

All workers expose health information via FFI:

from tasker_core import get_health_check

health = get_health_check()
# Returns: HealthCheck with component statuses

Metrics

Standard metrics available from all workers:

MetricDescription
pending_countEvents awaiting processing
in_flight_countEvents currently being processed
completed_countSuccessfully completed events
failed_countFailed events
starvation_detectedWhether events are timing out
starving_event_countEvents exceeding timeout threshold

Structured Logging

All workers use structured logging with consistent fields:

from tasker_core import log_info, LogContext

context = LogContext(
    correlation_id="abc-123",
    task_uuid="task-456",
    operation="process_order"
)
log_info("Processing order", context)

Specialized Handlers

Handler Type Hierarchy

Ruby (all are subclasses):

TaskerCore::StepHandler::Base
├── TaskerCore::StepHandler::Api        # HTTP/REST API integration
├── TaskerCore::StepHandler::Decision   # Dynamic workflow decisions
└── TaskerCore::StepHandler::Batchable  # Batch processing support

Python (Batchable is a mixin):

StepHandler (ABC)
├── ApiHandler         # HTTP/REST API integration (subclass)
├── DecisionHandler    # Dynamic workflow decisions (subclass)
└── + Batchable        # Batch processing (mixin via multiple inheritance)

ApiHandler

For HTTP API integration with automatic error classification:

class FetchUserHandler(ApiHandler):
    handler_name = "fetch_user"

    def call(self, context):
        response = self.get(f"/users/{context.input_data['user_id']}")
        return self.success(result=response.json())

DecisionHandler

For dynamic workflow routing:

class RoutingDecisionHandler < TaskerCore::StepHandler::Decision
  def call(context)
    amount = context.get_task_field('amount')

    if amount < 1000
      decision_success(steps: ['auto_approve'], result_data: { route: 'auto' })
    else
      decision_success(steps: ['manager_approval', 'finance_review'])
    end
  end
end

Batchable

For processing large datasets in chunks. Note: Ruby uses subclass inheritance, Python uses mixin.

Ruby (subclass):

class CsvBatchProcessorHandler < TaskerCore::StepHandler::Batchable
  def call(context)
    batch_ctx = get_batch_context(context)
    no_op_result = handle_no_op_worker(batch_ctx)
    return no_op_result if no_op_result

    # Process records using batch_ctx.start_cursor, batch_ctx.end_cursor
    batch_worker_complete(processed_count: batch_ctx.batch_size)
  end
end

Python (mixin):

class CsvProcessorHandler(StepHandler, Batchable):
    handler_name = "csv_processor"

    def call(self, context: StepContext) -> StepHandlerResult:
        batch_ctx = self.get_batch_context(context)
        # Process records using batch_ctx.start_cursor, batch_ctx.end_cursor
        return self.batch_worker_success(processed_count=batch_ctx.batch_size)

Checkpoint Yielding

Checkpoint yielding enables batch workers to persist progress and yield control back to the orchestrator for re-dispatch. This is essential for long-running batch operations.

When to Use

  • Processing takes longer than visibility timeout
  • You need resumable processing after failures
  • Long-running operations need progress visibility

Cross-Language API

All Batchable handlers provide checkpoint_yield() (or checkpointYield() in TypeScript):

Ruby:

class MyBatchWorker < TaskerCore::StepHandler::Batchable
  def call(context)
    batch_ctx = get_batch_context(context)

    # Resume from checkpoint if present
    start = batch_ctx.has_checkpoint? ? batch_ctx.checkpoint_cursor : 0

    items.each_with_index do |item, idx|
      process_item(item)

      # Checkpoint every 1000 items
      if (idx + 1) % 1000 == 0
        checkpoint_yield(
          cursor: start + idx + 1,
          items_processed: idx + 1,
          accumulated_results: { partial: "data" }
        )
      end
    end

    batch_worker_complete(processed_count: items.size)
  end
end

Python:

class MyBatchWorker(StepHandler, Batchable):
    def call(self, context):
        batch_ctx = self.get_batch_context(context)

        # Resume from checkpoint if present
        start = batch_ctx.checkpoint_cursor if batch_ctx.has_checkpoint() else 0

        for idx, item in enumerate(items):
            self.process_item(item)

            # Checkpoint every 1000 items
            if (idx + 1) % 1000 == 0:
                self.checkpoint_yield(
                    cursor=start + idx + 1,
                    items_processed=idx + 1,
                    accumulated_results={"partial": "data"}
                )

        return self.batch_worker_success(processed_count=len(items))

TypeScript:

class MyBatchWorker extends BatchableHandler {
  async call(context: StepContext): Promise<StepHandlerResult> {
    const batchCtx = this.getBatchContext(context);

    // Resume from checkpoint if present
    const start = batchCtx.hasCheckpoint() ? batchCtx.checkpointCursor : 0;

    for (let idx = 0; idx < items.length; idx++) {
      await this.processItem(items[idx]);

      // Checkpoint every 1000 items
      if ((idx + 1) % 1000 === 0) {
        await this.checkpointYield({
          cursor: start + idx + 1,
          itemsProcessed: idx + 1,
          accumulatedResults: { partial: "data" }
        });
      }
    }

    return this.batchWorkerSuccess({ processedCount: items.length });
  }
}

BatchWorkerContext Checkpoint Accessors

All languages provide consistent accessors for checkpoint data:

AccessorRubyPythonTypeScript
Cursor positioncheckpoint_cursorcheckpoint_cursorcheckpointCursor
Accumulated dataaccumulated_resultsaccumulated_resultsaccumulatedResults
Has checkpoint?has_checkpoint?has_checkpoint()hasCheckpoint()
Items processedcheckpoint_items_processedcheckpoint_items_processedcheckpointItemsProcessed

FFI Contract

FunctionDescription
checkpoint_yield_step_event(event_id, data)Persist checkpoint and re-dispatch step

Key Invariants

  1. Checkpoint-Persist-Then-Redispatch: Progress saved before re-dispatch
  2. Step Stays InProgress: No state machine transitions during yield
  3. Handler-Driven: Handlers decide when to checkpoint

See Batch Processing Guide - Checkpoint Yielding for comprehensive documentation.


Best Practices

1. Keep Handlers Focused

Each handler should do one thing well:

  • Validate input
  • Perform single operation
  • Return clear result

2. Use Error Classification

Always specify whether errors are retryable:

# Good - clear error classification
return self.failure("API rate limit", retryable=True)

# Bad - ambiguous error handling
raise Exception("API error")

3. Include Context in Errors

return StepHandlerResult.failure_handler_result(
    message="Database connection failed",
    error_type="database_error",
    retryable=True,
    metadata={
        "host": "db.example.com",
        "port": 5432,
        "connection_timeout_ms": 5000
    }
)

4. Use Structured Logging

log_info("Order processed", {
    "order_id": order_id,
    "total": total,
    "items_count": len(items)
})

5. Test Handler Isolation

Reset singletons between tests:

def setup_method(self):
    HandlerRegistry.reset_instance()
    EventBridge.reset_instance()

See Also