Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

FFI Boundary Types Reference

Cross-language type harmonization for Rust, Ruby, Python, and TypeScript boundaries.

This document defines the canonical FFI boundary types that cross the Rust orchestration layer and the Ruby/Python/TypeScript worker implementations. These types are critical for correct serialization/deserialization between languages.

Overview

The tasker-core system uses FFI (Foreign Function Interface) to integrate Rust orchestration with Ruby, Python, and TypeScript step handlers. Data crosses this boundary via framework-native mechanisms: Magnus (Ruby), PyO3 (Python), and napi-rs (TypeScript). For complex types like StepExecutionResult, all three workers use serde-based deserialization — the language side builds a dict/hash/object with snake_case keys matching Rust serde field names, then Rust deserializes via serde_magnus::deserialize(), depythonize(), or serde_json::from_value() respectively. These types must remain consistent across all four languages.

Source of Truth: Rust types in tasker-shared/src/messaging/execution_types.rs and tasker-shared/src/models/core/batch_worker.rs.

Type Mapping

Rust TypePython TypeTypeScript Type
CursorConfigRustCursorConfigRustCursorConfig
BatchProcessingOutcomeBatchProcessingOutcomeBatchProcessingOutcome
BatchWorkerInputsRustBatchWorkerInputsRustBatchWorkerInputs
BatchMetadataBatchMetadataBatchMetadata
FailureStrategyFailureStrategyFailureStrategy

CursorConfig

Cursor configuration for a single batch’s position and range.

Flexible Cursor Types

Unlike simple integer cursors, RustCursorConfig supports flexible cursor values:

  • Integer for record IDs: 123
  • String for timestamps: "2025-11-01T00:00:00Z"
  • Object for composite keys: {"page": 1, "offset": 0}

This enables cursor-based pagination across diverse data sources.

Rust Definition

#![allow(unused)]
fn main() {
// tasker-shared/src/messaging/execution_types.rs
pub struct CursorConfig {
    pub batch_id: String,
    pub start_cursor: serde_json::Value,  // Flexible type
    pub end_cursor: serde_json::Value,    // Flexible type
    pub batch_size: u32,
}
}

TypeScript Definition

// workers/typescript/src/types/batch.ts
export interface RustCursorConfig {
  batch_id: string;
  start_cursor: unknown;  // Flexible: number | string | object
  end_cursor: unknown;
  batch_size: number;
}

Python Definition

# workers/python/python/tasker_core/types.py
class RustCursorConfig(BaseModel):
    batch_id: str
    start_cursor: Any  # Flexible: int | str | dict
    end_cursor: Any
    batch_size: int

JSON Wire Format

{
  "batch_id": "batch_001",
  "start_cursor": 0,
  "end_cursor": 1000,
  "batch_size": 1000
}

BatchProcessingOutcome

Discriminated union representing the outcome of a batchable step.

Rust Definition

#![allow(unused)]
fn main() {
// tasker-shared/src/messaging/execution_types.rs
#[serde(tag = "type", rename_all = "snake_case")]
pub enum BatchProcessingOutcome {
    NoBatches,
    CreateBatches {
        worker_template_name: String,
        worker_count: u32,
        cursor_configs: Vec<CursorConfig>,
        total_items: u64,
    },
}
}

TypeScript Definition

// workers/typescript/src/types/batch.ts
export interface NoBatchesOutcome {
  type: 'no_batches';
}

export interface CreateBatchesOutcome {
  type: 'create_batches';
  worker_template_name: string;
  worker_count: number;
  cursor_configs: RustCursorConfig[];
  total_items: number;
}

export type BatchProcessingOutcome = NoBatchesOutcome | CreateBatchesOutcome;

Python Definition

# workers/python/python/tasker_core/types.py
class NoBatchesOutcome(BaseModel):
    type: str = "no_batches"

class CreateBatchesOutcome(BaseModel):
    type: str = "create_batches"
    worker_template_name: str
    worker_count: int
    cursor_configs: list[RustCursorConfig]
    total_items: int

BatchProcessingOutcome = NoBatchesOutcome | CreateBatchesOutcome

JSON Wire Formats

NoBatches:

{
  "type": "no_batches"
}

CreateBatches:

{
  "type": "create_batches",
  "worker_template_name": "batch_worker_template",
  "worker_count": 5,
  "cursor_configs": [
    { "batch_id": "001", "start_cursor": 0, "end_cursor": 1000, "batch_size": 1000 },
    { "batch_id": "002", "start_cursor": 1000, "end_cursor": 2000, "batch_size": 1000 }
  ],
  "total_items": 5000
}

BatchWorkerInputs

Initialization inputs for batch worker instances, stored in workflow_steps.inputs.

Rust Definition

#![allow(unused)]
fn main() {
// tasker-shared/src/models/core/batch_worker.rs
pub struct BatchWorkerInputs {
    pub cursor: CursorConfig,
    pub batch_metadata: BatchMetadata,
    pub is_no_op: bool,
}

pub struct BatchMetadata {
    // checkpoint_interval removed - handlers decide when to checkpoint
    pub cursor_field: String,
    pub failure_strategy: FailureStrategy,
}

pub enum FailureStrategy {
    ContinueOnFailure,
    FailFast,
    Isolate,
}
}

TypeScript Definition

// workers/typescript/src/types/batch.ts
export type FailureStrategy = 'continue_on_failure' | 'fail_fast' | 'isolate';

export interface BatchMetadata {
  // checkpoint_interval removed - handlers decide when to checkpoint
  cursor_field: string;
  failure_strategy: FailureStrategy;
}

export interface RustBatchWorkerInputs {
  cursor: RustCursorConfig;
  batch_metadata: BatchMetadata;
  is_no_op: boolean;
}

Python Definition

# workers/python/python/tasker_core/types.py
class FailureStrategy(str, Enum):
    CONTINUE_ON_FAILURE = "continue_on_failure"
    FAIL_FAST = "fail_fast"
    ISOLATE = "isolate"

class BatchMetadata(BaseModel):
    # checkpoint_interval removed - handlers decide when to checkpoint
    cursor_field: str
    failure_strategy: FailureStrategy

class RustBatchWorkerInputs(BaseModel):
    cursor: RustCursorConfig
    batch_metadata: BatchMetadata
    is_no_op: bool

JSON Wire Format

{
  "cursor": {
    "batch_id": "batch_001",
    "start_cursor": 0,
    "end_cursor": 1000,
    "batch_size": 1000
  },
  "batch_metadata": {
    "cursor_field": "id",
    "failure_strategy": "continue_on_failure"
  },
  "is_no_op": false
}

BatchAggregationResult

Standardized result from aggregating multiple batch worker results.

Cross-Language Standard

All three languages produce identical aggregation results:

FieldTypeDescription
total_processedintItems processed across all batches
total_succeededintItems that succeeded
total_failedintItems that failed
total_skippedintItems that were skipped
batch_countintNumber of batch workers that ran
success_ratefloatSuccess rate (0.0 to 1.0)
errorsarrayCollected errors (limited to 100)
error_countintTotal error count

Usage Examples

TypeScript:

import { aggregateBatchResults } from 'tasker-core';

const workerResults = Object.values(context.previousResults)
  .filter(r => r?.batch_worker);
const summary = aggregateBatchResults(workerResults);
return this.success(summary);

Python:

from tasker_core.types import aggregate_batch_results

worker_results = [
    context.get_dependency_result(f"worker_{i}")
    for i in range(batch_count)
]
summary = aggregate_batch_results(worker_results)
return self.success(summary.model_dump())

Factory Functions

Creating BatchProcessingOutcome

TypeScript:

import { noBatches, createBatches, RustCursorConfig } from 'tasker-core';

// No batches needed
const outcome1 = noBatches();

// Create batch workers
const configs: RustCursorConfig[] = [
  { batch_id: '001', start_cursor: 0, end_cursor: 1000, batch_size: 1000 },
  { batch_id: '002', start_cursor: 1000, end_cursor: 2000, batch_size: 1000 },
];
const outcome2 = createBatches('process_batch', 2, configs, 2000);

Python:

from tasker_core.types import no_batches, create_batches, RustCursorConfig

# No batches needed
outcome1 = no_batches()

# Create batch workers
configs = [
    RustCursorConfig(batch_id="001", start_cursor=0, end_cursor=1000, batch_size=1000),
    RustCursorConfig(batch_id="002", start_cursor=1000, end_cursor=2000, batch_size=1000),
]
outcome2 = create_batches("process_batch", 2, configs, 2000)

Type Guards (TypeScript)

import {
  BatchProcessingOutcome,
  isNoBatches,
  isCreateBatches
} from 'tasker-core';

function handleOutcome(outcome: BatchProcessingOutcome): void {
  if (isNoBatches(outcome)) {
    console.log('No batches needed');
    return;
  }

  if (isCreateBatches(outcome)) {
    console.log(`Creating ${outcome.worker_count} workers`);
    console.log(`Total items: ${outcome.total_items}`);
  }
}

Migration Notes

From Legacy Types

If migrating from older batch processing types:

  1. CursorConfigRustCursorConfig: The new type adds batch_id field and uses flexible cursor types (unknown/Any) instead of fixed number/int.

  2. Inline batch_processing_outcomeBatchProcessingOutcome: Use the discriminated union type with factory functions instead of building JSON manually.

  3. Manual aggregationaggregateBatchResults: Use the standardized aggregation function for consistent cross-language behavior.

Backwards Compatibility

The legacy CursorConfig type (with number/int cursors) is preserved for simple use cases. Use RustCursorConfig when:

  • Working with Rust orchestration inputs
  • Needing flexible cursor types (timestamps, UUIDs, composites)
  • Building BatchProcessingOutcome structures