States and Lifecycles
Last Updated: 2025-10-10 Audience: All Status: Active Related Docs: Documentation Hub | Events and Commands | Task Readiness & Execution
← Back to Documentation Hub
This document provides comprehensive documentation of the state machine architecture in tasker-core, covering both task and workflow step lifecycles, their state transitions, and the underlying persistence mechanisms.
Overview
The tasker-core system implements a sophisticated dual-state-machine architecture:
- Task State Machine: Manages overall workflow orchestration with 12 comprehensive states
- Workflow Step State Machine: Manages individual step execution with 8 states including orchestration queuing
Both state machines work in coordination to provide atomic, auditable, and resilient workflow execution with proper event-driven communication between orchestration and worker systems.
Task State Machine Architecture
Task State Definitions
The task state machine implements 12 comprehensive states as defined in tasker-shared/src/state_machine/states.rs:
Initial States
Pending: Created but not started (default initial state)Initializing: Discovering initial ready steps and setting up task context
Active Processing States
EnqueuingSteps: Actively enqueuing ready steps to worker queuesStepsInProcess: Steps are being processed by workers (orchestration monitoring)EvaluatingResults: Processing results from completed steps and determining next actions
Waiting States
WaitingForDependencies: No ready steps, waiting for dependencies to be satisfiedWaitingForRetry: Waiting for retry timeout before attempting failed steps againBlockedByFailures: Has failures that prevent progress (manual intervention may be needed)
Terminal States
Complete: All steps completed successfully (terminal)Error: Task failed permanently (terminal)Cancelled: Task was cancelled (terminal)ResolvedManually: Manually resolved by operator (terminal)
Task State Properties
Each state has key properties that drive system behavior:
#![allow(unused)]
fn main() {
impl TaskState {
pub fn is_terminal(&self) -> bool // Cannot transition further
pub fn requires_ownership(&self) -> bool // Processor ownership required
pub fn is_active(&self) -> bool // Currently being processed
pub fn is_waiting(&self) -> bool // Waiting for external conditions
pub fn can_be_processed(&self) -> bool // Available for orchestration pickup
}
}
Ownership-Required States: Initializing, EnqueuingSteps, StepsInProcess, EvaluatingResults
Processable States: Pending, WaitingForDependencies, WaitingForRetry
Task Lifecycle Flow
stateDiagram-v2
[*] --> Pending
%% Initial Flow
Pending --> Initializing : Start
%% From Initializing
Initializing --> EnqueuingSteps : ReadyStepsFound(count)
Initializing --> Complete : NoStepsFound
Initializing --> WaitingForDependencies : NoDependenciesReady
%% Processing Flow
EnqueuingSteps --> StepsInProcess : StepsEnqueued(uuids)
EnqueuingSteps --> Error : EnqueueFailed(error)
StepsInProcess --> EvaluatingResults : AllStepsCompleted
StepsInProcess --> EvaluatingResults : StepCompleted(uuid)
StepsInProcess --> WaitingForRetry : StepFailed(uuid)
%% Result Evaluation
EvaluatingResults --> Complete : AllStepsSuccessful
EvaluatingResults --> EnqueuingSteps : ReadyStepsFound(count)
EvaluatingResults --> WaitingForDependencies : NoDependenciesReady
EvaluatingResults --> BlockedByFailures : PermanentFailure(error)
%% Waiting States
WaitingForDependencies --> EvaluatingResults : DependenciesReady
WaitingForRetry --> EnqueuingSteps : RetryReady
%% Problem Resolution
BlockedByFailures --> Error : GiveUp
BlockedByFailures --> ResolvedManually : ManualResolution
%% Cancellation (from any non-terminal state)
Pending --> Cancelled : Cancel
Initializing --> Cancelled : Cancel
EnqueuingSteps --> Cancelled : Cancel
StepsInProcess --> Cancelled : Cancel
EvaluatingResults --> Cancelled : Cancel
WaitingForDependencies --> Cancelled : Cancel
WaitingForRetry --> Cancelled : Cancel
BlockedByFailures --> Cancelled : Cancel
%% Legacy Support
Error --> Pending : Reset
%% Terminal States
Complete --> [*]
Error --> [*]
Cancelled --> [*]
ResolvedManually --> [*]
Task Event System
Task state transitions are driven by events defined in tasker-shared/src/state_machine/events.rs:
Lifecycle Events
Start: Begin task processingCancel: Cancel task executionGiveUp: Abandon task (BlockedByFailures -> Error)ManualResolution: Manually resolve task
Discovery Events
ReadyStepsFound(count): Ready steps discovered during initialization/evaluationNoStepsFound: No steps defined - task can complete immediatelyNoDependenciesReady: Dependencies not satisfied - wait requiredDependenciesReady: Dependencies now ready - can proceed
Processing Events
StepsEnqueued(vec<Uuid>): Steps successfully queued for workersEnqueueFailed(error): Failed to enqueue stepsStepCompleted(uuid): Individual step completedStepFailed(uuid): Individual step failedAllStepsCompleted: All current batch steps finishedAllStepsSuccessful: All steps completed successfully
System Events
PermanentFailure(error): Unrecoverable failureRetryReady: Retry timeout expiredTimeout: Operation timeout occurredProcessorCrashed: Processor became unavailable
Processor Ownership
The task state machine implements processor ownership for active states to prevent race conditions:
#![allow(unused)]
fn main() {
// Ownership validation in task_state_machine.rs
if target_state.requires_ownership() {
let current_processor = self.get_current_processor().await?;
TransitionGuard::check_ownership(target_state, current_processor, self.processor_uuid)?;
}
}
Ownership Rules:
- States requiring ownership:
Initializing,EnqueuingSteps,StepsInProcess,EvaluatingResults - Processor UUID stored in
tasker.task_transitions.processor_uuidcolumn - Atomic ownership claiming prevents concurrent processing
- Ownership validated on each transition attempt
Workflow Step State Machine Architecture
Step State Definitions
The workflow step state machine implements 9 states for individual step execution:
Processing Pipeline States
Pending: Initial state when step is createdEnqueued: Queued for processing but not yet claimed by workerInProgress: Currently being executed by a workerEnqueuedForOrchestration: Worker completed, queued for orchestration processingEnqueuedAsErrorForOrchestration: Worker failed, queued for orchestration error processing
Waiting States
WaitingForRetry: Step failed with retryable error, waiting for backoff period before retry
Terminal States
Complete: Step completed successfully (after orchestration processing)Error: Step failed permanently (non-retryable or max retries exceeded)Cancelled: Step was cancelledResolvedManually: Step was manually resolved by operator
State Machine Evolution
Previously, the Error state was used for both retryable and permanent failures. The introduction of WaitingForRetry created a semantic change:
- Before:
Error= any failure (retryable or permanent) - After:
Error= permanent failure only,WaitingForRetry= retryable failure awaiting backoff
This change required updates to:
get_step_readiness_status()to recognizeWaitingForRetryas a ready-eligible stateget_task_execution_context()to properly detect blocked vs recovering tasks- Error classification logic to distinguish permanent from retryable errors
Step State Properties
#![allow(unused)]
fn main() {
impl WorkflowStepState {
pub fn is_terminal(&self) -> bool // No further transitions
pub fn is_error(&self) -> bool // In error state (may allow retry)
pub fn is_active(&self) -> bool // Being processed by worker
pub fn is_in_processing_pipeline(&self) -> bool // In execution pipeline
pub fn is_ready_for_claiming(&self) -> bool // Available for worker claim
pub fn satisfies_dependencies(&self) -> bool // Can satisfy other step dependencies
}
}
Step Lifecycle Flow
stateDiagram-v2
[*] --> Pending
%% Main Execution Path
Pending --> Enqueued : Enqueue
Enqueued --> InProgress : Start (worker claims)
InProgress --> EnqueuedForOrchestration : EnqueueForOrchestration(success)
EnqueuedForOrchestration --> Complete : Complete(results) [orchestration]
%% Error Handling Path
InProgress --> EnqueuedAsErrorForOrchestration : EnqueueForOrchestration(error)
EnqueuedAsErrorForOrchestration --> WaitingForRetry : WaitForRetry(error) [retryable]
EnqueuedAsErrorForOrchestration --> Error : Fail(error) [permanent/max retries]
%% Retry Path
WaitingForRetry --> Pending : Retry (after backoff)
%% Legacy Direct Path (deprecated)
InProgress --> Complete : Complete(results) [direct - legacy]
InProgress --> Error : Fail(error) [direct - legacy]
%% Legacy Backward Compatibility
Pending --> InProgress : Start [legacy]
%% Direct Failure Paths (error before worker processing)
Pending --> Error : Fail(error)
Enqueued --> Error : Fail(error)
%% Cancellation Paths
Pending --> Cancelled : Cancel
Enqueued --> Cancelled : Cancel
InProgress --> Cancelled : Cancel
EnqueuedForOrchestration --> Cancelled : Cancel
EnqueuedAsErrorForOrchestration --> Cancelled : Cancel
WaitingForRetry --> Cancelled : Cancel
Error --> Cancelled : Cancel
%% Manual Resolution (from any state)
Pending --> ResolvedManually : ResolveManually
Enqueued --> ResolvedManually : ResolveManually
InProgress --> ResolvedManually : ResolveManually
EnqueuedForOrchestration --> ResolvedManually : ResolveManually
EnqueuedAsErrorForOrchestration --> ResolvedManually : ResolveManually
WaitingForRetry --> ResolvedManually : ResolveManually
Error --> ResolvedManually : ResolveManually
%% Terminal States
Complete --> [*]
Error --> [*]
Cancelled --> [*]
ResolvedManually --> [*]
Step Event System
Step transitions are driven by StepEvent types:
Processing Events
Enqueue: Queue step for worker processingStart: Begin step execution (worker claims step)EnqueueForOrchestration(results): Worker completes, queues for orchestrationComplete(results): Mark step complete (from orchestration or legacy direct)Fail(error): Mark step as permanently failedWaitForRetry(error): Mark step for retry after backoff
Control Events
Cancel: Cancel step executionResolveManually: Manual operator resolutionRetry: Retry step from WaitingForRetry or Error state
Step Execution Flow Integration
The step state machine integrates tightly with the task state machine:
- Task Discovers Ready Steps:
TaskEvent::ReadyStepsFound(count)-> Task moves toEnqueuingSteps - Steps Get Enqueued:
StepEvent::Enqueue-> Steps move toEnqueuedstate - Workers Claim Steps:
StepEvent::Start-> Steps move toInProgress - Workers Complete Steps:
StepEvent::EnqueueForOrchestration(results)-> Steps move toEnqueuedForOrchestration - Orchestration Processes Results:
StepEvent::Complete(results)-> Steps move toComplete - Task Evaluates Progress:
TaskEvent::StepCompleted(uuid)-> Task moves toEvaluatingResults - Task Completes or Continues: Based on remaining steps -> Task moves to
Completeor back toEnqueuingSteps
Guard Conditions and Validation
Both state machines implement comprehensive guard conditions in tasker-shared/src/state_machine/guards.rs:
Task Guards
TransitionGuard
- Validates all task state transitions
- Prevents invalid state combinations
- Enforces terminal state immutability
- Supports legacy transition compatibility
Ownership Validation
- Checks processor ownership for ownership-required states
- Prevents concurrent task processing
- Allows ownership claiming for unowned tasks
Step Guards
StepDependenciesMetGuard
- Validates all step dependencies are satisfied
- Delegates to
WorkflowStep::dependencies_met() - Prevents premature step execution
StepNotInProgressGuard
- Ensures step is not already being processed
- Prevents duplicate worker claims
- Validates step availability
Retry Guards
StepCanBeRetriedGuard: Validates step is in Error state- Checks retry limits and conditions
- Prevents infinite retry loops
Orchestration Guards
StepCanBeEnqueuedForOrchestrationGuard: Step must be InProgressStepCanBeCompletedFromOrchestrationGuard: Step must be EnqueuedForOrchestrationStepCanBeFailedFromOrchestrationGuard: Step must be EnqueuedForOrchestration
Persistence Layer Architecture
Delegation Pattern
The persistence layer in tasker-shared/src/state_machine/persistence.rs implements a delegation pattern to the model layer:
#![allow(unused)]
fn main() {
// TaskTransitionPersistence -> TaskTransition::create() & TaskTransition::get_current()
// StepTransitionPersistence -> WorkflowStepTransition::create() & WorkflowStepTransition::get_current()
}
Benefits:
- No SQL duplication between state machine and models
- Atomic transaction handling in models
- Single source of truth for database operations
- Independent testability of model methods
Transition Storage
Task Transitions (tasker.task_transitions)
CREATE TABLE tasker.task_transitions (
task_transition_uuid UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
task_uuid UUID NOT NULL,
to_state VARCHAR NOT NULL,
from_state VARCHAR,
processor_uuid UUID, -- Ownership tracking
metadata JSONB,
sort_key INTEGER NOT NULL,
most_recent BOOLEAN DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
Step Transitions (tasker.workflow_step_transitions)
CREATE TABLE tasker.workflow_step_transitions (
workflow_step_transition_uuid UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
workflow_step_uuid UUID NOT NULL,
to_state VARCHAR NOT NULL,
from_state VARCHAR,
metadata JSONB,
sort_key INTEGER NOT NULL,
most_recent BOOLEAN DEFAULT false,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
Current State Resolution
Both transition models implement efficient current state resolution:
#![allow(unused)]
fn main() {
// O(1) current state lookup using most_recent flag
TaskTransition::get_current(pool, task_uuid) -> Option<TaskTransition>
WorkflowStepTransition::get_current(pool, step_uuid) -> Option<WorkflowStepTransition>
}
Performance Optimization:
most_recent = trueflag on latest transition only- Indexed queries:
(task_uuid, most_recent) WHERE most_recent = true - Atomic flag updates during transition creation
Atomic Transitions with Ownership
Atomic transitions with processor ownership:
#![allow(unused)]
fn main() {
impl TaskTransitionPersistence {
pub async fn transition_with_ownership(
&self,
task_uuid: Uuid,
from_state: TaskState,
to_state: TaskState,
processor_uuid: Uuid,
metadata: Option<Value>,
pool: &PgPool,
) -> PersistenceResult<bool>
}
}
Atomicity Guarantees:
- Single database transaction for state change
- Processor UUID stored in dedicated column
most_recentflag updated atomically- Race condition prevention through database constraints
Action System
Both state machines execute actions after successful transitions:
Task Actions
- PublishTransitionEventAction: Publishes task state change events
- UpdateTaskCompletionAction: Updates task completion status
- ErrorStateCleanupAction: Performs error state cleanup
Step Actions
- PublishTransitionEventAction: Publishes step state change events
- UpdateStepResultsAction: Updates step results and execution data
- TriggerStepDiscoveryAction: Triggers task-level step discovery
- ErrorStateCleanupAction: Performs step error cleanup
Actions execute sequentially after transition persistence, ensuring consistency.
State Machine Integration Points
Task <-> Step Coordination
- Step Discovery: Task initialization discovers ready steps
- Step Enqueueing: Task enqueues discovered steps to worker queues
- Progress Monitoring: Task monitors step completion via events
- Result Processing: Task processes step results and discovers next steps
- Completion Detection: Task completes when all steps are complete
Event-Driven Communication
- pg_notify: PostgreSQL notifications for real-time coordination
- Event Publishers: Publish state transition events to event system
- Event Subscribers: React to state changes across system boundaries
- Queue Integration: Provider-agnostic message queues (PGMQ or RabbitMQ) for worker communication
Worker Integration
- Step Claiming: Workers claim
Enqueuedsteps from queues - Progress Updates: Workers transition steps to
InProgress - Result Submission: Workers submit results via
EnqueueForOrchestration - Orchestration Processing: Orchestration processes results and completes steps
This sophisticated state machine architecture provides the foundation for reliable, auditable, and scalable workflow orchestration in the tasker-core system.
Step Result Audit System
The step result audit system provides SOC2-compliant audit trails for workflow step execution results, enabling complete attribution tracking for compliance and debugging.
Audit Table Design
The tasker.workflow_step_result_audit table stores lightweight references with attribution data:
CREATE TABLE tasker.workflow_step_result_audit (
workflow_step_result_audit_uuid UUID PRIMARY KEY DEFAULT uuid_generate_v7(),
workflow_step_uuid UUID NOT NULL REFERENCES tasker.workflow_steps,
workflow_step_transition_uuid UUID NOT NULL REFERENCES tasker.workflow_step_transitions,
task_uuid UUID NOT NULL REFERENCES tasker.tasks,
recorded_at TIMESTAMP NOT NULL DEFAULT NOW(),
-- Attribution (NEW data not in transitions)
worker_uuid UUID,
correlation_id UUID,
-- Extracted scalars for indexing/filtering
success BOOLEAN NOT NULL,
execution_time_ms BIGINT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
UNIQUE (workflow_step_uuid, workflow_step_transition_uuid)
);
Design Principles
-
No Data Duplication: Full execution results already exist in
tasker.workflow_step_transitions.metadata. The audit table stores references only. -
Attribution Capture: The audit system captures NEW attribution data:
worker_uuid: Which worker instance processed the stepcorrelation_id: Distributed tracing identifier for request correlation
-
Indexed Scalars: Success and execution time are extracted for efficient filtering without JSON parsing.
-
SQL Trigger: A database trigger (
trg_step_result_audit) guarantees audit record creation when workers persist results, ensuring SOC2 compliance.
Attribution Flow
Attribution data flows through the system via TransitionContext:
#![allow(unused)]
fn main() {
// Worker creates attribution context
let context = TransitionContext::with_worker(
worker_uuid,
Some(correlation_id),
);
// Context is merged into transition metadata
state_machine.transition_with_context(event, Some(context)).await?;
// SQL trigger extracts attribution from metadata
-- In trigger:
-- v_worker_uuid := (NEW.metadata->>'worker_uuid')::UUID;
-- v_correlation_id := (NEW.metadata->>'correlation_id')::UUID;
}
Trigger Behavior
The create_step_result_audit trigger fires on transitions to:
enqueued_for_orchestration: Successful step completionenqueued_as_error_for_orchestration: Failed step completion
These states represent when workers persist execution results, creating the audit trail.
Querying Audit History
Via API
GET /v1/tasks/{task_uuid}/workflow_steps/{step_uuid}/audit
Returns audit records with full transition details via JOIN, ordered by recorded_at DESC.
Via Client
#![allow(unused)]
fn main() {
let audit_history = client.get_step_audit_history(task_uuid, step_uuid).await?;
for record in audit_history {
println!("Worker: {:?}, Success: {}, Time: {:?}ms",
record.worker_uuid,
record.success,
record.execution_time_ms
);
}
}
Via Model
#![allow(unused)]
fn main() {
// Get audit history for a step with full transition details
let history = WorkflowStepResultAudit::get_audit_history(&pool, step_uuid).await?;
// Get all audit records for a task
let task_history = WorkflowStepResultAudit::get_task_audit_history(&pool, task_uuid).await?;
// Query by worker for attribution investigation
let worker_records = WorkflowStepResultAudit::get_by_worker(&pool, worker_uuid, Some(100)).await?;
// Query by correlation ID for distributed tracing
let correlated = WorkflowStepResultAudit::get_by_correlation_id(&pool, correlation_id).await?;
}
Indexes for Common Query Patterns
The audit table includes optimized indexes:
idx_audit_step_uuid: Primary query - get audit history for a stepidx_audit_task_uuid: Get all audit records for a taskidx_audit_recorded_at: Time-range queries for SOC2 audit reportsidx_audit_worker_uuid: Attribution investigation (partial index)idx_audit_correlation_id: Distributed tracing queries (partial index)idx_audit_success: Success/failure filtering
Historical Data
The migration includes a backfill for existing transitions. Historical records will have NULL attribution (worker_uuid, correlation_id) since that data wasn’t captured before the audit system was introduced.