Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Worker Crates Overview

Last Updated: 2025-12-27 Audience: Developers, Architects, Operators Status: Active Related Docs: Worker Event Systems | Worker Actors

<- Back to Documentation Hub


The tasker-core workspace provides four worker implementations for executing workflow step handlers. Each implementation targets different deployment scenarios and developer ecosystems while sharing the same core Rust foundation.

Quick Navigation

DocumentDescription
API Convergence MatrixQuick reference for aligned APIs across languages
Example HandlersSide-by-side handler examples
Patterns and PracticesCommon patterns across all workers
Rust WorkerNative Rust implementation
Ruby WorkerRuby gem for Rails integration
Python WorkerPython package for data pipelines
TypeScript WorkerTypeScript/JS for Bun/Node/Deno

Overview

Four Workers, One Foundation

All workers share the same Rust core (tasker-worker crate) for orchestration, queueing, and state management. The language-specific workers add handler execution in their respective runtimes.

┌─────────────────────────────────────────────────────────────────────────────┐
│                           WORKER ARCHITECTURE                                 │
└─────────────────────────────────────────────────────────────────────────────┘

                              PostgreSQL + PGMQ
                                      │
                                      ▼
                    ┌─────────────────────────────┐
                    │   Rust Core (tasker-worker) │
                    │   ─────────────────────────│
                    │   • Queue Management        │
                    │   • State Machines          │
                    │   • Orchestration           │
                    │   • Actor System            │
                    └─────────────────────────────┘
                                      │
          ┌───────────────┬───────────┼───────────┬───────────────┐
          │               │           │           │               │
          ▼               ▼           ▼           ▼               ▼
    ┌───────────┐   ┌───────────┐   ┌───────────┐   ┌─────────────┐
    │   Rust    │   │   Ruby    │   │  Python   │   │ TypeScript  │
    │  Worker   │   │  Worker   │   │  Worker   │   │   Worker    │
    │───────────│   │───────────│   │───────────│   │─────────────│
    │ Native    │   │ FFI Bridge│   │ FFI Bridge│   │ FFI Bridge  │
    │ Handlers  │   │ + Gem     │   │ + Package │   │ Bun/Node/Deno│
    └───────────┘   └───────────┘   └───────────┘   └─────────────┘

Comparison Table

FeatureRustRubyPythonTypeScript
PerformanceNativeGVL-limitedGIL-limitedV8/Bun native
IntegrationStandaloneRails/Rack appsData pipelinesNode/Bun/Deno apps
Handler StyleAsync traitsClass-basedABC-basedClass-based
ConcurrencyTokio asyncThread + FFI pollThread + FFI pollEvent loop + FFI poll
DeploymentBinaryGem + ServerPackage + ServerPackage + Server
Headless ModeN/ALibrary embedLibrary embedLibrary embed
Runtimes-MRICPythonBun, Node.js, Deno

When to Use Each

Rust Worker - Best for:

  • Maximum throughput requirements
  • Resource-constrained environments
  • Standalone microservices
  • Performance-critical handlers

Ruby Worker - Best for:

  • Rails/Ruby applications
  • ActiveRecord/ORM integration
  • Existing Ruby codebases
  • Quick prototyping with Ruby ecosystem

Python Worker - Best for:

  • Data processing pipelines
  • ML/AI integration
  • Scientific computing workflows
  • Python-native team preferences

TypeScript Worker - Best for:

  • Modern JavaScript/TypeScript applications
  • Full-stack Node.js teams
  • Edge computing with Bun or Deno
  • React/Vue/Angular backend services
  • Multi-runtime deployment flexibility

Deployment Modes

Server Mode

All workers can run as standalone servers:

Rust:

cargo run -p workers-rust

Ruby:

cd workers/ruby
./bin/server.rb

Python:

cd workers/python
python bin/server.py

TypeScript (Bun):

cd workers/typescript
bun run bin/server.ts

TypeScript (Node.js):

cd workers/typescript
npx tsx bin/server.ts

Headless/Embedded Mode (Ruby, Python & TypeScript)

Ruby, Python, and TypeScript workers can be embedded into existing applications without running the HTTP server. Headless mode is controlled via TOML configuration, not bootstrap parameters.

TOML Configuration (e.g., config/tasker/base/worker.toml):

[web]
enabled = false  # Disables HTTP server for headless/embedded mode

Ruby (in Rails):

# config/initializers/tasker.rb
require 'tasker_core'

# Bootstrap worker (web server disabled via TOML config)
TaskerCore::Worker::Bootstrap.start!

# Register handlers
TaskerCore::Registry::HandlerRegistry.instance.register_handler(
  'MyHandler',
  MyHandler
)

Python (in application):

from tasker_core import bootstrap_worker, HandlerRegistry
from tasker_core.types import BootstrapConfig

# Bootstrap worker (web server disabled via TOML config)
config = BootstrapConfig(namespace="my-app")
bootstrap_worker(config)

# Register handlers
registry = HandlerRegistry.instance()
registry.register("my_handler", MyHandler)

TypeScript (in application):

import { createRuntime, HandlerRegistry, EventEmitter, EventPoller, StepExecutionSubscriber } from '@tasker-systems/tasker';

// Bootstrap worker (web server disabled via TOML config)
const runtime = createRuntime();
await runtime.load('/path/to/libtasker_ts.dylib');
runtime.bootstrapWorker({ namespace: 'my-app' });

// Register handlers
const registry = new HandlerRegistry();
registry.register('my_handler', MyHandler);

// Start event processing
const emitter = new EventEmitter();
const subscriber = new StepExecutionSubscriber(emitter, registry, runtime, {});
subscriber.start();

const poller = new EventPoller(runtime, emitter);
poller.start();

Core Concepts

1. Handler Registration

All workers use a registry pattern for handler discovery:

                    ┌─────────────────────┐
                    │  HandlerRegistry    │
                    │  (Singleton)        │
                    └─────────────────────┘
                              │
              ┌───────────────┼───────────────┐
              │               │               │
              ▼               ▼               ▼
         ┌─────────┐    ┌─────────┐    ┌─────────┐
         │Handler A│    │Handler B│    │Handler C│
         └─────────┘    └─────────┘    └─────────┘

2. Event Flow

Step events flow through a consistent pipeline:

1. PGMQ Queue → Event received
2. Worker claims step (atomic)
3. Handler resolved by name
4. Handler.call(context) executed
5. Result sent to completion channel
6. Orchestration receives result

3. Error Classification

All workers distinguish between:

  • Retryable Errors: Transient failures → Re-enqueue step
  • Permanent Errors: Unrecoverable → Mark step failed

4. Graceful Shutdown

All workers handle shutdown signals (SIGTERM, SIGINT):

1. Signal received
2. Stop accepting new work
3. Complete in-flight handlers
4. Flush completion channel
5. Shutdown Rust foundation
6. Exit cleanly

Configuration

Environment Variables

Common across all workers:

VariableDescriptionDefault
DATABASE_URLPostgreSQL connection stringRequired
TASKER_ENVEnvironment (test/development/production)development
TASKER_CONFIG_PATHPath to TOML configurationAuto-detected
TASKER_TEMPLATE_PATHPath to task templatesAuto-detected
TASKER_NAMESPACEWorker namespace for queue isolationdefault
RUST_LOGLog level (trace/debug/info/warn/error)info

Language-Specific

Ruby:

VariableDescription
RUBY_GC_HEAP_GROWTH_FACTORGC tuning for production

Python:

VariableDescription
PYTHON_HANDLER_PATHPath for handler auto-discovery

Handler Types

All workers support specialized handler types:

StepHandler (Base)

Basic step execution:

class MyHandler(StepHandler):
    handler_name = "my_handler"

    def call(self, context):
        return self.success({"result": "done"})

ApiHandler

HTTP/REST API integration with automatic error classification:

class FetchDataHandler < TaskerCore::StepHandler::Api
  def call(context)
    user_id = context.get_task_field('user_id')
    response = connection.get("/users/#{user_id}")
    process_response(response)
    success(result: response.body)
  end
end

DecisionHandler

Dynamic workflow routing:

class RouteHandler(DecisionHandler):
    handler_name = "route_handler"

    def call(self, context):
        if context.input_data["amount"] < 1000:
            return self.route_to_steps(["auto_approve"])
        return self.route_to_steps(["manager_approval"])

Batchable

Large dataset processing. Note: Ruby uses subclass inheritance, Python uses mixin:

Ruby (subclass of Base):

class CsvBatchProcessorHandler < TaskerCore::StepHandler::Batchable
  def call(context)
    batch_ctx = get_batch_context(context)
    no_op_result = handle_no_op_worker(batch_ctx)
    return no_op_result if no_op_result

    # Process batch using batch_ctx.start_cursor, batch_ctx.end_cursor
    batch_worker_complete(processed_count: batch_ctx.batch_size)
  end
end

Python (mixin):

class CsvBatchProcessor(StepHandler, Batchable):
    handler_name = "csv_batch_processor"

    def call(self, context: StepContext) -> StepHandlerResult:
        batch_ctx = self.get_batch_context(context)
        if batch_ctx is None:
            return self.failure(message="No batch context", error_type="missing_context")
        # Process batch using batch_ctx.start_cursor, batch_ctx.end_cursor
        batch_size = batch_ctx.cursor_config.end_cursor - batch_ctx.cursor_config.start_cursor
        return self.batch_worker_success(items_processed=batch_size)

Quick Start

Rust

# Build and run
cd workers/rust
cargo run

# With custom configuration
TASKER_CONFIG_PATH=/path/to/config.toml cargo run

Ruby

# Install dependencies
cd workers/ruby
bundle install
bundle exec rake compile

# Run server
./bin/server.rb

Python

# Install dependencies
cd workers/python
uv sync
uv run maturin develop

# Run server
python bin/server.py

TypeScript

# Install dependencies
cd workers/typescript
bun install
cargo build --release -p tasker-ts

# Run server (Bun)
bun run bin/server.ts

# Run server (Node.js)
npx tsx bin/server.ts

# Run server (Deno)
deno run --allow-ffi --allow-env --allow-net bin/server.ts

Monitoring

Health Checks

All workers expose health status:

# Python
from tasker_core import get_health_check
health = get_health_check()
# Ruby
health = TaskerCore::FFI.health_check

Metrics

Common metrics available:

MetricDescription
pending_countEvents awaiting processing
in_flight_countEvents being processed
completed_countSuccessfully completed
failed_countFailed events
starvation_detectedProcessing bottleneck

Logging

All workers use structured logging:

2025-01-15T10:30:00Z [INFO] python-worker: Processing step step_uuid=abc-123 handler=process_order
2025-01-15T10:30:01Z [INFO] python-worker: Step completed step_uuid=abc-123 success=true duration_ms=150

Architecture Deep Dive

For detailed architectural documentation:


See Also