Task and Step Readiness and Execution
Last Updated: 2026-01-10 Audience: Developers, Architects Status: Active Related Docs: Documentation Hub | States and Lifecycles | Events and Commands
← Back to Documentation Hub
This document provides comprehensive documentation of the SQL functions and database logic that drives task and step readiness analysis, dependency resolution, and execution coordination in the tasker-core system.
Overview
The tasker-core system relies heavily on sophisticated PostgreSQL functions to perform complex workflow orchestration operations at the database level. This approach provides significant performance benefits through set-based operations, atomic transactions, and reduced network round trips while maintaining data consistency.
The SQL function system supports several critical categories of operations:
- Step Readiness Analysis: Complex dependency resolution and backoff calculations
- DAG Operations: Cycle detection, depth calculation, and parallel execution discovery
- State Management: Atomic state transitions with processor ownership tracking
- Analytics and Monitoring: Performance metrics and system health analysis
- Task Execution Context: Comprehensive execution metadata and results management
SQL Function Architecture
Function Categories
The SQL functions are organized into logical categories as defined in
tasker-shared/src/database/sql_functions.rs:
1. Step Readiness Analysis
get_step_readiness_status(task_uuid, step_uuids[]): Comprehensive dependency analysiscalculate_backoff_delay(attempts, base_delay): Exponential backoff calculationcheck_step_dependencies(step_uuid): Parent completion validationget_ready_steps(task_uuid): Parallel execution candidate discovery
2. DAG Operations
detect_cycle(from_step_uuid, to_step_uuid): Cycle detection using recursive CTEscalculate_dependency_levels(task_uuid): Topological depth calculationcalculate_step_depth(step_uuid): Individual step depth analysisget_step_transitive_dependencies(step_uuid): Full dependency tree traversal
3. State Management
transition_task_state_atomic(task_uuid, from_state, to_state, processor_uuid): Atomic state transitions with ownershipget_current_task_state(task_uuid): Current task state resolutionfinalize_task_completion(task_uuid): Task completion orchestration
4. Analytics and Monitoring
get_analytics_metrics(since_timestamp): Comprehensive system analyticsget_system_health_counts(): System-wide health and performance metricsget_slowest_steps(limit): Performance optimization analysisget_slowest_tasks(limit): Task performance analysis
5. Task Discovery and Execution
get_next_ready_task(): Single task discovery for orchestrationget_next_ready_tasks(limit): Batch task discovery for scalingget_task_ready_info(task_uuid): Detailed task readiness informationget_task_execution_context(task_uuid): Complete execution metadata
Database Schema Foundation
Core Tables
The SQL functions operate on a comprehensive schema designed for UUID v7
performance and scalability. All tables reside in the tasker schema
with simplified names. With search_path = tasker, public, queries use unqualified
table names.
Primary Tables
tasks: Main workflow instances with UUID v7 primary keysworkflow_steps: Individual workflow steps with dependency relationshipstask_transitions: Task state change audit trail with processor trackingworkflow_step_transitions: Step state change audit trail
Registry Tables
task_namespaces: Workflow namespace definitionsnamed_tasks: Task type templates and metadatanamed_steps: Step type definitions and handlersworkflow_step_edges: Step dependency relationships (DAG structure)
Richer Task State Enhancements
The richer task states migration (migrations/tasker/20251209000000_tas41_richer_task_states.sql) enhanced the
schema with:
Task State Management:
-- 12 comprehensive task states
ALTER TABLE task_transitions
ADD CONSTRAINT chk_task_transitions_to_state
CHECK (to_state IN (
'pending', 'initializing', 'enqueuing_steps', 'steps_in_process',
'evaluating_results', 'waiting_for_dependencies', 'waiting_for_retry',
'blocked_by_failures', 'complete', 'error', 'cancelled', 'resolved_manually'
));
Processor Ownership Tracking:
ALTER TABLE task_transitions
ADD COLUMN processor_uuid UUID,
ADD COLUMN transition_metadata JSONB DEFAULT '{}';
Atomic State Transitions:
CREATE OR REPLACE FUNCTION transition_task_state_atomic(
p_task_uuid UUID,
p_from_state VARCHAR,
p_to_state VARCHAR,
p_processor_uuid UUID,
p_metadata JSONB DEFAULT '{}'
) RETURNS BOOLEAN
Step Readiness Analysis
Recent Enhancements
WaitingForRetry State Support (Migration 20250927000000)
The step readiness system was enhanced to support the new WaitingForRetry state, which distinguishes retryable failures from permanent errors:
Key Changes:
- Helper Functions: Added
calculate_step_next_retry_time()andevaluate_step_state_readiness()for consistent backoff logic - State Recognition: Updated readiness evaluation to treat
waiting_for_retryas a ready-eligible state alongsidepending - Backoff Calculation: Centralized exponential backoff logic with configurable backoff periods
- Performance Optimization: Introduced task-scoped CTEs to eliminate table scans for batch operations
Semantic Impact:
- Before:
errorstate included both retryable and permanent failures - After:
error= permanent only,waiting_for_retry= awaiting backoff for retry
Backoff Logic Consolidation (October 2025)
The backoff calculation system was consolidated to eliminate configuration conflicts and race conditions:
Key Changes:
- Configuration Alignment: Single source of truth (TOML config) with max_backoff_seconds = 60
- Parameterized SQL Functions:
calculate_step_next_retry_time()accepts configurable max delay and multiplier - Atomic Updates: Row-level locking prevents concurrent backoff update conflicts
- Timing Consistency:
last_attempted_atupdated atomically withbackoff_request_seconds
Issues Resolved:
- Configuration Conflicts: Eliminated three conflicting max values (30s SQL, 60s code, 300s TOML)
- Race Conditions: Added SELECT FOR UPDATE locking in BackoffCalculator
- Hardcoded Values: Removed hardcoded 30-second cap and power(2, attempts) in SQL
Helper Functions Enhanced:
-
calculate_step_next_retry_time(): Now parameterized with configuration valuesCREATE OR REPLACE FUNCTION calculate_step_next_retry_time( backoff_request_seconds INTEGER, last_attempted_at TIMESTAMP, failure_time TIMESTAMP, attempts INTEGER, p_max_backoff_seconds INTEGER DEFAULT 60, p_backoff_multiplier NUMERIC DEFAULT 2.0 ) RETURNS TIMESTAMP- Respects custom backoff periods from step configuration (primary path)
- Falls back to exponential backoff with configurable parameters
- Defaults aligned with TOML config (60s max, 2.0 multiplier)
- Used consistently across all readiness evaluation
-
set_step_backoff_atomic(): New atomic update functionCREATE OR REPLACE FUNCTION set_step_backoff_atomic( p_step_uuid UUID, p_backoff_seconds INTEGER ) RETURNS BOOLEAN- Provides transactional guarantee for concurrent updates
- Updates both
backoff_request_secondsandlast_attempted_at - Ensures timing consistency with SQL calculations
-
evaluate_step_state_readiness(): Determines if a step is ready for executionCREATE OR REPLACE FUNCTION evaluate_step_state_readiness( current_state TEXT, processed BOOLEAN, in_process BOOLEAN, dependencies_satisfied BOOLEAN, retry_eligible BOOLEAN, retryable BOOLEAN, next_retry_time TIMESTAMP ) RETURNS BOOLEAN- Recognizes both
pendingandwaiting_for_retryas ready-eligible states - Validates backoff period has expired before allowing retry
- Ensures dependencies are satisfied and retry limits not exceeded
- Recognizes both
Step Readiness Status
The get_step_readiness_status function provides comprehensive analysis of step
execution eligibility:
CREATE OR REPLACE FUNCTION get_step_readiness_status(
task_uuid UUID,
step_uuids UUID[] DEFAULT NULL
) RETURNS TABLE(
workflow_step_uuid UUID,
task_uuid UUID,
named_step_uuid UUID,
name VARCHAR,
current_state VARCHAR,
dependencies_satisfied BOOLEAN,
retry_eligible BOOLEAN,
ready_for_execution BOOLEAN,
last_failure_at TIMESTAMP,
next_retry_at TIMESTAMP,
total_parents INTEGER,
completed_parents INTEGER,
attempts INTEGER,
retry_limit INTEGER,
backoff_request_seconds INTEGER,
last_attempted_at TIMESTAMP
)
Key Analysis Features
Dependency Satisfaction:
- Validates all parent steps are in
completeorresolved_manuallystates - Handles complex DAG structures with multiple dependency paths
- Supports conditional dependencies based on parent results
Retry Logic:
- Exponential backoff calculation:
2^attemptsseconds (max 30) - Custom backoff periods from step configuration
- Retry limit enforcement to prevent infinite loops
- Failure tracking with temporal analysis
Execution Readiness:
- State validation (must be
pendingorerror) - Dependency satisfaction confirmation
- Retry eligibility assessment
- Backoff period expiration checking
Step Readiness Implementation
The Rust integration provides type-safe access to step readiness analysis:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct StepReadinessStatus {
pub workflow_step_uuid: Uuid,
pub task_uuid: Uuid,
pub named_step_uuid: Uuid,
pub name: String,
pub current_state: String,
pub dependencies_satisfied: bool,
pub retry_eligible: bool,
pub ready_for_execution: bool,
pub last_failure_at: Option<NaiveDateTime>,
pub next_retry_at: Option<NaiveDateTime>,
pub total_parents: i32,
pub completed_parents: i32,
pub attempts: i32,
pub retry_limit: i32,
pub backoff_request_seconds: Option<i32>,
pub last_attempted_at: Option<NaiveDateTime>,
}
impl StepReadinessStatus {
pub fn can_execute_now(&self) -> bool {
self.ready_for_execution
}
pub fn blocking_reason(&self) -> Option<&'static str> {
if !self.dependencies_satisfied {
return Some("dependencies_not_satisfied");
}
if !self.retry_eligible {
return Some("retry_not_eligible");
}
Some("invalid_state")
}
pub fn effective_backoff_seconds(&self) -> i32 {
self.backoff_request_seconds.unwrap_or_else(|| {
if self.attempts > 0 {
std::cmp::min(2_i32.pow(self.attempts as u32), 30)
} else {
0
}
})
}
}
}
DAG Operations and Dependency Resolution
Dependency Level Calculation
The calculate_dependency_levels function uses recursive CTEs to perform
topological analysis of the workflow DAG:
CREATE OR REPLACE FUNCTION calculate_dependency_levels(input_task_uuid UUID)
RETURNS TABLE(workflow_step_uuid UUID, dependency_level INTEGER)
LANGUAGE plpgsql STABLE AS $$
BEGIN
RETURN QUERY
WITH RECURSIVE dependency_levels AS (
-- Base case: Find root nodes (steps with no dependencies)
SELECT
ws.workflow_step_uuid,
0 as level
FROM workflow_steps ws
WHERE ws.task_uuid = input_task_uuid
AND NOT EXISTS (
SELECT 1
FROM workflow_step_edges wse
WHERE wse.to_step_uuid = ws.workflow_step_uuid
)
UNION ALL
-- Recursive case: Find children of current level nodes
SELECT
wse.to_step_uuid as workflow_step_uuid,
dl.level + 1 as level
FROM dependency_levels dl
JOIN workflow_step_edges wse ON wse.from_step_uuid = dl.workflow_step_uuid
JOIN workflow_steps ws ON ws.workflow_step_uuid = wse.to_step_uuid
WHERE ws.task_uuid = input_task_uuid
)
SELECT
dl.workflow_step_uuid,
MAX(dl.level) as dependency_level -- Use MAX to handle multiple paths
FROM dependency_levels dl
GROUP BY dl.workflow_step_uuid
ORDER BY dependency_level, workflow_step_uuid;
END;
Dependency Level Benefits
Parallel Execution Planning:
- Steps at the same dependency level can execute in parallel
- Enables optimal resource utilization across workers
- Supports batch enqueueing for scalability
Execution Ordering:
- Level 0: Root steps (no dependencies) - can start immediately
- Level N: Steps requiring completion of level N-1 steps
- Topological ordering ensures dependency satisfaction
Performance Optimization:
- Single query provides complete dependency analysis
- Avoids N+1 query problems in dependency resolution
- Enables batch processing optimizations
Transitive Dependencies
The get_step_transitive_dependencies function provides complete ancestor analysis:
CREATE OR REPLACE FUNCTION get_step_transitive_dependencies(step_uuid UUID)
RETURNS TABLE(
step_name VARCHAR,
step_uuid UUID,
task_uuid UUID,
distance INTEGER,
processed BOOLEAN,
results JSONB
)
This enables step handlers to access results from any ancestor step:
#![allow(unused)]
fn main() {
impl SqlFunctionExecutor {
pub async fn get_step_dependency_results_map(
&self,
step_uuid: Uuid,
) -> Result<HashMap<String, StepExecutionResult>, sqlx::Error> {
let dependencies = self.get_step_transitive_dependencies(step_uuid).await?;
Ok(dependencies
.into_iter()
.filter_map(|dep| {
if dep.processed && dep.results.is_some() {
let results: StepExecutionResult = dep.results.unwrap().into();
Some((dep.step_name, results))
} else {
None
}
})
.collect())
}
}
}
Task Execution Context
Recent Enhancements
Permanently Blocked Detection Fix (Migration 20251001000000)
The get_task_execution_context function was enhanced to correctly identify tasks blocked by permanent errors:
Problem: The function only checked attempts >= retry_limit to detect permanently blocked steps, missing cases where workers marked errors as non-retryable (e.g., missing handlers, configuration errors).
Solution: Updated permanently_blocked_steps calculation to check both conditions:
COUNT(CASE WHEN sd.current_state = 'error'
AND (sd.attempts >= retry_limit OR sd.retry_eligible = false) THEN 1 END)
Impact:
- execution_status: Now correctly returns
blocked_by_failuresinstead ofwaiting_for_dependenciesfor tasks with non-retryable errors - recommended_action: Returns
handle_failuresinstead ofwait_for_dependencies - health_status: Returns
blockedinstead ofrecoveringwhen appropriate
This fix ensures the orchestration system properly identifies when manual intervention is needed versus when a task is simply waiting for retry backoff.
Task Discovery and Orchestration
Task Readiness Discovery
The system provides multiple functions for task discovery based on orchestration needs:
Single Task Discovery
CREATE OR REPLACE FUNCTION get_next_ready_task()
RETURNS TABLE(
task_uuid UUID,
task_name VARCHAR,
priority INTEGER,
namespace_name VARCHAR,
ready_steps_count BIGINT,
computed_priority NUMERIC,
current_state VARCHAR
)
Batch Task Discovery
CREATE OR REPLACE FUNCTION get_next_ready_tasks(limit_count INTEGER)
RETURNS TABLE(
task_uuid UUID,
task_name VARCHAR,
priority INTEGER,
namespace_name VARCHAR,
ready_steps_count BIGINT,
computed_priority NUMERIC,
current_state VARCHAR
)
Task Ready Information
The ReadyTaskInfo structure provides comprehensive task metadata for
orchestration decisions:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct ReadyTaskInfo {
pub task_uuid: Uuid,
pub task_name: String,
pub priority: i32,
pub namespace_name: String,
pub ready_steps_count: i64,
pub computed_priority: Option<BigDecimal>,
pub current_state: String,
}
}
Priority Calculation:
- Base priority from task configuration
- Dynamic priority adjustment based on age, retry attempts
- Namespace-based priority modifiers
- SLA-based priority escalation
Ready Steps Count:
- Real-time count of steps eligible for execution
- Used for batch size optimization
- Influences orchestration scheduling decisions
State Management and Atomic Transitions
Atomic State Transitions
The enhanced state machine provides atomic transitions with processor ownership:
CREATE OR REPLACE FUNCTION transition_task_state_atomic(
p_task_uuid UUID,
p_from_state VARCHAR,
p_to_state VARCHAR,
p_processor_uuid UUID,
p_metadata JSONB DEFAULT '{}'
) RETURNS BOOLEAN AS $$
DECLARE
v_sort_key INTEGER;
v_transitioned BOOLEAN := FALSE;
BEGIN
-- Get next sort key
SELECT COALESCE(MAX(sort_key), 0) + 1 INTO v_sort_key
FROM task_transitions
WHERE task_uuid = p_task_uuid;
-- Atomically transition only if in expected state
WITH current_state AS (
SELECT to_state, processor_uuid
FROM task_transitions
WHERE task_uuid = p_task_uuid
AND most_recent = true
FOR UPDATE
),
ownership_check AS (
SELECT
CASE
-- States requiring ownership
WHEN cs.to_state IN ('initializing', 'enqueuing_steps',
'steps_in_process', 'evaluating_results')
THEN cs.processor_uuid = p_processor_uuid OR cs.processor_uuid IS NULL
-- Other states don't require ownership
ELSE true
END as can_transition
FROM current_state cs
WHERE cs.to_state = p_from_state
),
do_update AS (
UPDATE task_transitions
SET most_recent = false
WHERE task_uuid = p_task_uuid
AND most_recent = true
AND EXISTS (SELECT 1 FROM ownership_check WHERE can_transition)
RETURNING task_uuid
)
INSERT INTO task_transitions (
task_uuid, from_state, to_state,
processor_uuid, transition_metadata,
sort_key, most_recent, created_at, updated_at
)
SELECT
p_task_uuid, p_from_state, p_to_state,
p_processor_uuid, p_metadata,
v_sort_key, true, NOW(), NOW()
WHERE EXISTS (SELECT 1 FROM do_update);
GET DIAGNOSTICS v_transitioned = ROW_COUNT;
RETURN v_transitioned > 0;
END;
$$ LANGUAGE plpgsql;
Key Features
Atomic Operation:
- Single transaction with row-level locking
- Compare-and-swap semantics prevent race conditions
- Returns boolean indicating success/failure
Ownership Validation:
- Processor ownership required for active states
- Prevents concurrent processing by multiple orchestrators
- Supports ownership claiming for unowned tasks
State Consistency:
- Validates current state matches expected
from_state - Maintains audit trail with complete transition history
- Updates
most_recentflags atomically
Current State Resolution
Fast current state lookups are provided through optimized queries:
#![allow(unused)]
fn main() {
impl SqlFunctionExecutor {
pub async fn get_current_task_state(&self, task_uuid: Uuid)
-> Result<TaskState, sqlx::Error> {
let state_str = sqlx::query_scalar!(
r#"SELECT get_current_task_state($1) as "state""#,
task_uuid
)
.fetch_optional(&self.pool)
.await?
.ok_or_else(|| sqlx::Error::RowNotFound)?;
match state_str {
Some(state) => TaskState::try_from(state.as_str())
.map_err(|_| sqlx::Error::Decode("Invalid task state".into())),
None => Err(sqlx::Error::RowNotFound),
}
}
}
}
Analytics and System Health
System Health Monitoring
The get_system_health_counts function provides comprehensive system visibility:
CREATE OR REPLACE FUNCTION get_system_health_counts()
RETURNS TABLE(
pending_tasks BIGINT,
initializing_tasks BIGINT,
enqueuing_steps_tasks BIGINT,
steps_in_process_tasks BIGINT,
evaluating_results_tasks BIGINT,
waiting_for_dependencies_tasks BIGINT,
waiting_for_retry_tasks BIGINT,
blocked_by_failures_tasks BIGINT,
complete_tasks BIGINT,
error_tasks BIGINT,
cancelled_tasks BIGINT,
resolved_manually_tasks BIGINT,
total_tasks BIGINT,
-- step counts...
) AS $$
Health Score Calculation
The Rust implementation provides derived health metrics:
#![allow(unused)]
fn main() {
impl SystemHealthCounts {
pub fn health_score(&self) -> f64 {
if self.total_tasks == 0 {
return 1.0;
}
let success_rate = self.complete_tasks as f64 / self.total_tasks as f64;
let error_rate = self.error_tasks as f64 / self.total_tasks as f64;
let connection_health = 1.0 -
(self.active_connections as f64 / self.max_connections as f64).min(1.0);
// Weighted combination: 50% success rate, 30% error rate, 20% connection health
(success_rate * 0.5) + ((1.0 - error_rate) * 0.3) + (connection_health * 0.2)
}
pub fn is_under_heavy_load(&self) -> bool {
let connection_pressure =
self.active_connections as f64 / self.max_connections as f64;
let error_rate = if self.total_tasks > 0 {
self.error_tasks as f64 / self.total_tasks as f64
} else {
0.0
};
connection_pressure > 0.8 || error_rate > 0.2
}
}
}
Analytics Metrics
The get_analytics_metrics function provides comprehensive performance analysis:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct AnalyticsMetrics {
pub active_tasks_count: i64,
pub total_namespaces_count: i64,
pub unique_task_types_count: i64,
pub system_health_score: BigDecimal,
pub task_throughput: i64,
pub completion_count: i64,
pub error_count: i64,
pub completion_rate: BigDecimal,
pub error_rate: BigDecimal,
pub avg_task_duration: BigDecimal,
pub avg_step_duration: BigDecimal,
pub step_throughput: i64,
pub analysis_period_start: DateTime<Utc>,
pub calculated_at: DateTime<Utc>,
}
}
Performance Optimization Analysis
Slowest Steps Analysis
The system provides performance optimization guidance through detailed analysis:
CREATE OR REPLACE FUNCTION get_slowest_steps(
since_timestamp TIMESTAMP WITH TIME ZONE,
limit_count INTEGER,
namespace_filter VARCHAR,
task_name_filter VARCHAR,
version_filter VARCHAR
) RETURNS TABLE(
named_step_uuid INTEGER,
step_name VARCHAR,
avg_duration_seconds NUMERIC,
max_duration_seconds NUMERIC,
min_duration_seconds NUMERIC,
execution_count INTEGER,
error_count INTEGER,
error_rate NUMERIC,
last_executed_at TIMESTAMP WITH TIME ZONE
)
Slowest Tasks Analysis
Similar analysis is available at the task level:
#![allow(unused)]
fn main() {
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct SlowestTaskAnalysis {
pub named_task_uuid: Uuid,
pub task_name: String,
pub avg_duration_seconds: f64,
pub max_duration_seconds: f64,
pub min_duration_seconds: f64,
pub execution_count: i32,
pub avg_step_count: f64,
pub error_count: i32,
pub error_rate: f64,
pub last_executed_at: Option<DateTime<Utc>>,
}
}
Critical Problem-Solving SQL Functions
PGMQ Message Race Condition Prevention
Problem: Multiple Workers Claiming Same Message
When multiple workers simultaneously try to process steps from the same queue, PGMQ’s standard
pgmq.read() function randomly selects messages, potentially causing workers to miss messages
they were specifically notified about. This creates inefficiency and potential race conditions.
Solution: pgmq_read_specific_message()
CREATE OR REPLACE FUNCTION pgmq_read_specific_message(
queue_name text,
target_msg_id bigint,
vt_seconds integer DEFAULT 30
) RETURNS TABLE (
msg_id bigint,
read_ct integer,
enqueued_at timestamp with time zone,
vt timestamp with time zone,
message jsonb
) AS $$
Key Problem-Solving Logic:
-
Atomic Claim with Visibility Timeout: Uses UPDATE…RETURNING pattern to atomically:
- Check if message is available (
vt <= now()) - Set new visibility timeout preventing other workers from claiming
- Increment read count for monitoring retry attempts
- Return message data only if successfully claimed
- Check if message is available (
-
Race Condition Prevention: The
WHERE vt <= now()clause ensures only one worker can claim a message. If two workers try simultaneously, only one UPDATE succeeds. -
Graceful Failure Handling: Returns empty result set if message is:
- Already claimed by another worker (vt > now())
- Non-existent (deleted or never existed)
- Archived (moved to archive table)
-
Security: Validates queue name to prevent SQL injection in dynamic query construction.
Real-World Impact: Eliminates “message not found” errors when workers are notified about specific messages but can’t retrieve them due to random selection in standard read.
Task State Ownership and Atomic Transitions
Problem: Concurrent Orchestrators Processing Same Task
In distributed deployments, multiple orchestrator instances might try to process the same task simultaneously, leading to duplicate work, inconsistent state, and race conditions.
Solution: transition_task_state_atomic()
CREATE OR REPLACE FUNCTION transition_task_state_atomic(
p_task_uuid UUID,
p_from_state VARCHAR,
p_to_state VARCHAR,
p_processor_uuid UUID,
p_metadata JSONB DEFAULT '{}'
) RETURNS BOOLEAN AS $$
Key Problem-Solving Logic:
-
Compare-and-Swap Pattern:
- Reads current state with
FOR UPDATElock - Only transitions if current state matches expected
from_state - Returns false if state has changed, allowing caller to retry with fresh state
- Reads current state with
-
Processor Ownership Enforcement:
CASE WHEN cs.to_state IN ('initializing', 'enqueuing_steps', 'steps_in_process', 'evaluating_results') THEN cs.processor_uuid = p_processor_uuid OR cs.processor_uuid IS NULL ELSE true END- Active processing states require ownership match
- Allows claiming unowned tasks (NULL processor_uuid)
- Terminal states (complete, error) don’t require ownership
-
Audit Trail Preservation:
- Updates previous transition’s
most_recent = false - Inserts new transition with
most_recent = true - Maintains complete history with sort_key ordering
- Updates previous transition’s
-
Atomic Success/Failure: Returns boolean indicating whether transition succeeded, enabling callers to handle contention gracefully.
Real-World Impact: Enables safe distributed orchestration where multiple instances can operate without conflicts, automatically distributing work through ownership claiming.
Batch Task Discovery with Priority
Problem: Efficient Work Distribution Across Orchestrators
Orchestrators need to discover ready tasks efficiently without creating hotspots or missing tasks, while respecting priority and avoiding claimed tasks.
Solution: get_next_ready_tasks()
CREATE OR REPLACE FUNCTION get_next_ready_tasks(p_limit INTEGER DEFAULT 5)
RETURNS TABLE(
task_uuid UUID,
task_name TEXT,
priority INTEGER,
namespace_name TEXT,
ready_steps_count BIGINT,
computed_priority NUMERIC,
current_state VARCHAR
)
Key Problem-Solving Logic:
-
Ready Step Discovery:
WITH ready_steps AS ( SELECT task_uuid, COUNT(*) as ready_count FROM workflow_steps WHERE current_state IN ('pending', 'error') AND [dependency checks] GROUP BY task_uuid )- Pre-aggregates ready steps per task for efficiency
- Considers both new steps and retryable errors
-
State-Based Filtering:
- Only returns tasks in states that need processing
- Excludes terminal states (complete, cancelled)
- Includes waiting states that might have become ready
-
Priority Computation:
computed_priority = base_priority + (age_factor * hours_waiting) + (retry_factor * retry_count)- Dynamic priority based on age and retry attempts
- Prevents task starvation through age escalation
-
Batch Efficiency:
- Returns multiple tasks in single query
- Reduces database round trips
- Enables parallel processing across orchestrators
Real-World Impact: Enables efficient work distribution where each orchestrator can claim a batch of tasks, reducing contention and improving throughput.
Complex Dependency Resolution
Problem: Determining Step Execution Readiness
Workflow steps have complex dependencies involving parent completion, retry logic, backoff timing, and state validation. Determining which steps are ready for execution requires sophisticated dependency analysis that must handle:
- Multiple parent dependencies with conditional logic
- Exponential backoff after failures
- Retry limits and attempt tracking
- State consistency across distributed workers
Solution: get_step_readiness_status()
CREATE OR REPLACE FUNCTION get_step_readiness_status(
input_task_uuid UUID,
step_uuids UUID[] DEFAULT NULL
) RETURNS TABLE(
workflow_step_uuid UUID,
task_uuid UUID,
named_step_uuid UUID,
name VARCHAR,
current_state VARCHAR,
dependencies_satisfied BOOLEAN,
retry_eligible BOOLEAN,
ready_for_execution BOOLEAN,
-- ... additional metadata
)
Key Problem-Solving Logic:
-
Dependency Satisfaction Analysis:
WITH parent_completion AS ( SELECT edge.to_step_uuid, COUNT(*) as total_parents, COUNT(CASE WHEN parent.current_state = 'complete' THEN 1 END) as completed_parents FROM workflow_step_edges edge JOIN workflow_steps parent ON parent.workflow_step_uuid = edge.from_step_uuid WHERE parent.task_uuid = input_task_uuid GROUP BY edge.to_step_uuid )- Counts total vs. completed parent dependencies
- Handles conditional dependencies based on parent results
- Supports complex DAG structures with multiple paths
-
Retry Eligibility Assessment:
retry_eligible = ( current_state = 'error' AND attempts < retry_limit AND (last_attempted_at IS NULL OR last_attempted_at + backoff_interval <= NOW()) )- Enforces retry limits to prevent infinite loops
- Calculates exponential backoff:
2^attemptsseconds (max 30) - Respects custom backoff periods from step configuration
- Considers temporal constraints for retry timing
-
State Validation:
ready_for_execution = ( current_state IN ('pending', 'error') AND dependencies_satisfied AND retry_eligible )- Only pending or retryable error steps can execute
- Requires all dependencies satisfied
- Must pass retry eligibility checks
- Prevents execution of steps in terminal states
-
Backoff Calculation:
next_retry_at = CASE WHEN current_state = 'error' AND attempts > 0 THEN last_attempted_at + INTERVAL '1 second' * COALESCE(backoff_request_seconds, LEAST(POW(2, attempts), 30)) ELSE NULL END- Custom backoff from step configuration takes precedence
- Default exponential backoff with maximum cap
- Temporal precision for scheduling retry attempts
Real-World Impact: Enables complex workflow orchestration with sophisticated dependency management, retry logic, and backoff handling, supporting enterprise-grade reliability patterns while maintaining high performance through set-based operations.
Integration with Event and State Systems
PostgreSQL LISTEN/NOTIFY Integration
The SQL functions integrate with the event-driven architecture through PostgreSQL notifications:
PGMQ Wrapper Functions for Atomic Operations
The system uses wrapper functions that combine PGMQ message sending with PostgreSQL notifications atomically:
-- Atomic wrapper that sends message AND notification
CREATE OR REPLACE FUNCTION pgmq_send_with_notify(
queue_name TEXT,
message JSONB,
delay_seconds INTEGER DEFAULT 0
) RETURNS BIGINT AS $$
DECLARE
msg_id BIGINT;
namespace_name TEXT;
event_payload TEXT;
namespace_channel TEXT;
global_channel TEXT := 'pgmq_message_ready';
BEGIN
-- Send message using PGMQ's native function
SELECT pgmq.send(queue_name, message, delay_seconds) INTO msg_id;
-- Extract namespace from queue name using robust helper
namespace_name := extract_queue_namespace(queue_name);
-- Build namespace-specific channel name
namespace_channel := 'pgmq_message_ready.' || namespace_name;
-- Build event payload
event_payload := json_build_object(
'event_type', 'message_ready',
'msg_id', msg_id,
'queue_name', queue_name,
'namespace', namespace_name,
'ready_at', NOW()::timestamptz,
'delay_seconds', delay_seconds
)::text;
-- Send notifications in same transaction
PERFORM pg_notify(namespace_channel, event_payload);
-- Also send to global channel if different
IF namespace_channel != global_channel THEN
PERFORM pg_notify(global_channel, event_payload);
END IF;
RETURN msg_id;
END;
$$ LANGUAGE plpgsql;
Namespace Extraction Helper
-- Robust namespace extraction helper function
CREATE OR REPLACE FUNCTION extract_queue_namespace(queue_name TEXT)
RETURNS TEXT AS $$
BEGIN
-- Handle orchestration queues
IF queue_name ~ '^orchestration' THEN
RETURN 'orchestration';
END IF;
-- Handle worker queues: worker_namespace_queue -> namespace
IF queue_name ~ '^worker_.*_queue$' THEN
RETURN COALESCE(
(regexp_match(queue_name, '^worker_(.+?)_queue$'))[1],
'worker'
);
END IF;
-- Handle standard namespace_queue pattern
IF queue_name ~ '^[a-zA-Z][a-zA-Z0-9_]*_queue$' THEN
RETURN COALESCE(
(regexp_match(queue_name, '^([a-zA-Z][a-zA-Z0-9_]*)_queue$'))[1],
'default'
);
END IF;
-- Fallback for any other pattern
RETURN 'default';
END;
$$ LANGUAGE plpgsql;
Fallback Polling for Task Readiness
Instead of database triggers for task readiness notifications, the system uses a fallback polling mechanism to ensure no ready tasks are missed:
FallbackPoller Configuration:
- Default polling interval: 30 seconds
- Runs
StepEnqueuerService::process_batch()periodically - Catches tasks that may have been missed by primary PGMQ notification system
- Configurable enable/disable via TOML configuration
Key Benefits:
- Resilience: Ensures no tasks are permanently stuck if notifications fail
- Simplicity: No complex database triggers or state tracking required
- Observability: Clear metrics on fallback discovery vs. event-driven discovery
- Safety Net: Primary event-driven system + fallback polling provides redundancy
PGMQ Message Queue Integration
SQL functions coordinate with PGMQ for reliable message processing:
Queue Management Functions
-- Ensure queue exists with proper configuration
CREATE OR REPLACE FUNCTION ensure_task_queue(queue_name VARCHAR)
RETURNS BOOLEAN AS $$
BEGIN
-- Create queue if it doesn't exist
PERFORM pgmq.create_queue(queue_name);
-- Ensure headers column exists (pgmq-rs compatibility)
PERFORM pgmq_ensure_headers_column(queue_name);
RETURN TRUE;
END;
$$ LANGUAGE plpgsql;
Message Processing Support
-- Get queue statistics for monitoring
CREATE OR REPLACE FUNCTION get_queue_statistics(queue_name VARCHAR)
RETURNS TABLE(
queue_name VARCHAR,
queue_length BIGINT,
oldest_msg_age_seconds INTEGER,
newest_msg_age_seconds INTEGER
) AS $$
BEGIN
RETURN QUERY
SELECT
queue_name,
pgmq.queue_length(queue_name),
EXTRACT(EPOCH FROM (NOW() - MIN(enqueued_at)))::INTEGER,
EXTRACT(EPOCH FROM (NOW() - MAX(enqueued_at)))::INTEGER
FROM pgmq.messages(queue_name);
END;
$$ LANGUAGE plpgsql;
Transaction Safety
All SQL functions are designed with transaction safety in mind:
Atomic Operations:
- State transitions use row-level locking (
FOR UPDATE) - Compare-and-swap patterns prevent race conditions
- Rollback safety for partial failures
Consistency Guarantees:
- Foreign key constraints maintained across all operations
- Check constraints validate state transitions
- Audit trails preserved for debugging and compliance
Performance Optimization:
- Efficient indexes for common query patterns
- Materialized views for expensive analytics queries
- Connection pooling for high concurrency
Usage Patterns and Best Practices
Rust Integration Patterns
The SqlFunctionExecutor provides type-safe access to all SQL functions:
#![allow(unused)]
fn main() {
use tasker_shared::database::sql_functions::{SqlFunctionExecutor, FunctionRegistry};
// Direct executor usage
let executor = SqlFunctionExecutor::new(pool);
let ready_steps = executor.get_ready_steps(task_uuid).await?;
// Registry pattern for organized access
let registry = FunctionRegistry::new(pool);
let analytics = registry.analytics().get_analytics_metrics(None).await?;
let health = registry.system_health().get_system_health_counts().await?;
}
Batch Processing Optimization
For high-throughput scenarios, the system supports efficient batch operations:
#![allow(unused)]
fn main() {
// Batch step readiness analysis
let task_uuids = vec![task1_uuid, task2_uuid, task3_uuid];
let batch_readiness = executor.get_step_readiness_status_batch(task_uuids).await?;
// Batch task discovery
let ready_tasks = executor.get_next_ready_tasks(50).await?;
}
Error Handling Best Practices
SQL function errors are properly propagated through the type system:
#![allow(unused)]
fn main() {
match executor.get_current_task_state(task_uuid).await {
Ok(state) => {
// Process state
}
Err(sqlx::Error::RowNotFound) => {
// Handle missing task
}
Err(e) => {
// Handle other database errors
}
}
}