Events and Commands Architecture
Last Updated: 2026-01-15 Audience: Architects, Developers Status: Active Related Docs: Documentation Hub | Actor-Based Architecture | Messaging Abstraction | States and Lifecycles | Deployment Patterns
← Back to Documentation Hub
This document provides comprehensive documentation of the event-driven and command pattern architecture in tasker-core, covering the unified event system foundation, orchestration and worker implementations, and the flow of tasks and steps through the system.
Overview
The tasker-core system implements a sophisticated hybrid architecture that combines:
- Event-Driven Systems: Real-time coordination using PostgreSQL LISTEN/NOTIFY and PGMQ notifications
- Command Pattern: Async command processors using tokio mpsc channels for orchestration and worker operations
- Hybrid Deployment Modes: PollingOnly, EventDrivenOnly, and Hybrid modes with fallback polling
- Queue-Based Communication: Provider-agnostic message queues (PGMQ or RabbitMQ) for reliable step execution and result processing
This architecture eliminates polling complexity while maintaining resilience through fallback mechanisms and provides horizontal scaling capabilities with atomic operation guarantees.
Event System Foundation
EventDrivenSystem Trait
The foundation of the event architecture is defined in tasker-shared/src/event_system/event_driven.rs with the EventDrivenSystem trait:
#![allow(unused)]
fn main() {
#[async_trait]
pub trait EventDrivenSystem: Send + Sync {
type SystemId: Send + Sync + Clone + fmt::Display + fmt::Debug;
type Event: Send + Sync + Clone + fmt::Debug;
type Config: Send + Sync + Clone;
type Statistics: EventSystemStatistics + Send + Sync + Clone;
// Core lifecycle methods
async fn start(&mut self) -> Result<(), DeploymentModeError>;
async fn stop(&mut self) -> Result<(), DeploymentModeError>;
fn is_running(&self) -> bool;
// Event processing
async fn process_event(&self, event: Self::Event) -> Result<(), DeploymentModeError>;
// Monitoring and health
async fn health_check(&self) -> Result<DeploymentModeHealthStatus, DeploymentModeError>;
fn statistics(&self) -> Self::Statistics;
// Configuration
fn deployment_mode(&self) -> DeploymentMode;
fn config(&self) -> &Self::Config;
}
}
Deployment Modes
The system supports three deployment modes for different operational requirements:
PollingOnly Mode
- Traditional polling-based coordination
- No event listeners or real-time notifications
- Reliable fallback for environments with networking restrictions
- Higher latency but guaranteed operation
EventDrivenOnly Mode
- Pure event-driven coordination using PostgreSQL LISTEN/NOTIFY
- Real-time response to database changes
- Lowest latency for step discovery and task coordination
- Requires reliable PostgreSQL connections
Hybrid Mode
- Primary event-driven coordination with polling fallback
- Best of both worlds: real-time when possible, reliable when needed
- Automatic fallback during connection issues
- Production-ready with resilience guarantees
Selecting a Deployment Mode
The Tasker system is built with the expectation of distributed deployment with multiple instances of both orchestration core servers and worker servers operating simultaneously. The goal of separating deployment mode is to enable different deployments to scale up event driven only processing nodes to meet demand, while having polling only nodes at a reasonable fallback polling interval and batch size. It is also to deploy in hybrid mode and control these on an instance over instance level.
Event Types and Sources
Queue-Level Events (Provider-Agnostic)
The system supports multiple messaging backends through MessageNotification:
#![allow(unused)]
fn main() {
pub enum MessageNotification {
/// Signal-only notification (PGMQ style)
/// Indicates a message is available but requires separate fetch
Available {
queue_name: String,
msg_id: Option<i64>,
},
/// Full message notification (RabbitMQ style)
/// Contains the complete message payload
Message(QueuedMessage<Vec<u8>>),
}
}
Event Sources by Provider:
| Provider | Notification Type | Fetch Required | Fallback Polling |
|---|---|---|---|
| PGMQ | Available | Yes (read by msg_id) | Required |
| RabbitMQ | Message | No (full payload) | Not needed |
| InMemory | Message | No | Not needed |
Common Event Types:
- Step Results: Worker completion notifications
- Task Requests: New task initialization requests
- Message Ready Events: Queue message availability notifications
- Transport: Provider-agnostic via
MessagingProvider.subscribe_many()
Command Pattern Architecture
Command Processor Pattern
Both orchestration and worker systems implement the command pattern to replace complex polling-based coordinators:
Benefits:
- No Polling Loops (Except where intended for fallback): Pure tokio mpsc command processing
- Simplified Architecture: ~100 lines vs 1000+ lines of complex systems
- Race Condition Prevention: Atomic operations through proper delegation
- Observability Preservation: Maintains metrics through delegated components
Command Flow Patterns
Both systems follow consistent command processing patterns:
sequenceDiagram
participant Client
participant CommandChannel
participant Processor
participant Delegate
participant Response
Client->>CommandChannel: Send Command + ResponseChannel
CommandChannel->>Processor: Receive Command
Processor->>Delegate: Delegate to Business Logic Component
Delegate-->>Processor: Return Result
Processor->>Response: Send Result via ResponseChannel
Response-->>Client: Receive Result
Orchestration Event Systems
OrchestrationEventSystem
Implemented in tasker-orchestration/src/orchestration/event_systems/orchestration_event_system.rs:
#![allow(unused)]
fn main() {
pub struct OrchestrationEventSystem {
system_id: String,
deployment_mode: DeploymentMode,
queue_listener: Option<OrchestrationQueueListener>,
fallback_poller: Option<OrchestrationFallbackPoller>,
context: Arc<SystemContext>,
orchestration_core: Arc<OrchestrationCore>,
command_sender: mpsc::Sender<OrchestrationCommand>,
// ... statistics and state
}
}
Orchestration Command Types
The command processor handles both full-message and signal-only notification types:
#![allow(unused)]
fn main() {
pub enum OrchestrationCommand {
// Task lifecycle
InitializeTask { request: TaskRequestMessage, resp: CommandResponder<TaskInitializeResult> },
ProcessStepResult { result: StepExecutionResult, resp: CommandResponder<StepProcessResult> },
FinalizeTask { task_uuid: Uuid, resp: CommandResponder<TaskFinalizationResult> },
// Full message processing (RabbitMQ style - MessageNotification::Message)
// Used when provider delivers complete message payload
ProcessStepResultFromMessage { queue_name: String, message: QueuedMessage, resp: CommandResponder<StepProcessResult> },
InitializeTaskFromMessage { queue_name: String, message: QueuedMessage, resp: CommandResponder<TaskInitializeResult> },
FinalizeTaskFromMessage { queue_name: String, message: QueuedMessage, resp: CommandResponder<TaskFinalizationResult> },
// Signal-only processing (PGMQ style - MessageNotification::Available)
// Used when provider sends notification that requires separate fetch
ProcessStepResultFromMessageEvent { message_event: MessageReadyEvent, resp: CommandResponder<StepProcessResult> },
InitializeTaskFromMessageEvent { message_event: MessageReadyEvent, resp: CommandResponder<TaskInitializeResult> },
FinalizeTaskFromMessageEvent { message_event: MessageReadyEvent, resp: CommandResponder<TaskFinalizationResult> },
// Task readiness (database events)
ProcessTaskReadiness { task_uuid: Uuid, namespace: String, priority: i32, ready_steps: i32, triggered_by: String, resp: CommandResponder<TaskReadinessResult> },
// System operations
GetProcessingStats { resp: CommandResponder<OrchestrationProcessingStats> },
HealthCheck { resp: CommandResponder<SystemHealth> },
Shutdown { resp: CommandResponder<()> },
}
}
Command Routing by Notification Type:
MessageNotification::Message->*FromMessagecommands (immediate processing)MessageNotification::Available->*FromMessageEventcommands (requires fetch)
Orchestration Queue Architecture
The orchestration system coordinates multiple queue types:
- orchestration_step_results: Step completion results from workers
- orchestration_task_requests: New task initialization requests
- orchestration_task_finalization: Task finalization notifications
- Namespace Queues: Per-namespace step queues (e.g.,
fulfillment_queue,inventory_queue)
TaskReadinessEventSystem
Handles database-level events for task readiness using PostgreSQL LISTEN/NOTIFY:
#![allow(unused)]
fn main() {
pub struct TaskReadinessEventSystem {
system_id: String,
deployment_mode: DeploymentMode,
listener: Option<TaskReadinessListener>,
fallback_poller: Option<TaskReadinessFallbackPoller>,
context: Arc<SystemContext>,
command_sender: mpsc::Sender<OrchestrationCommand>,
// ... configuration and statistics
}
}
PGMQ Notification Channels:
pgmq_message_ready.orchestration: Orchestration queue messages ready (task requests, step results, finalizations)pgmq_message_ready.{namespace}: Worker namespace queue messages ready (e.g.,payments,fulfillment,linear_workflow)pgmq_message_ready: Global channel for all queue messages (fallback)pgmq_queue_created: Queue creation notifications
Unified Event Coordination
The UnifiedEventCoordinator demonstrates coordinated management of multiple event systems:
#![allow(unused)]
fn main() {
pub struct UnifiedEventCoordinator {
orchestration_system: OrchestrationEventSystem,
task_readiness_fallback: FallbackPoller,
deployment_mode: DeploymentMode,
health_monitor: EventSystemHealthMonitor,
// ... coordination logic
}
}
Coordination Features:
- Shared Command Channel: Both systems send commands to same orchestration processor
- Health Monitoring: Unified health checking across all event systems
- Deployment Mode Management: Synchronized mode changes
- Statistics Aggregation: Combined metrics from all systems
Worker Event Systems
WorkerEventSystem
Implemented in tasker-worker/src/worker/event_systems/worker_event_system.rs:
#![allow(unused)]
fn main() {
pub struct WorkerEventSystem {
system_id: String,
deployment_mode: DeploymentMode,
queue_listeners: HashMap<String, WorkerQueueListener>,
fallback_pollers: HashMap<String, WorkerFallbackPoller>,
context: Arc<SystemContext>,
command_sender: mpsc::Sender<WorkerCommand>,
// ... statistics and configuration
}
}
Worker Command Types
#![allow(unused)]
fn main() {
pub enum WorkerCommand {
// Step execution
ExecuteStep { message: PgmqMessage<SimpleStepMessage>, queue_name: String, resp: CommandResponder<()> },
ExecuteStepWithCorrelation { message: PgmqMessage<SimpleStepMessage>, queue_name: String, correlation_id: Uuid, resp: CommandResponder<()> },
// Result processing
SendStepResult { result: StepExecutionResult, resp: CommandResponder<()> },
ProcessStepCompletion { step_result: StepExecutionResult, correlation_id: Option<Uuid>, resp: CommandResponder<()> },
// Event integration
ExecuteStepFromMessage { queue_name: String, message: PgmqMessage, resp: CommandResponder<()> },
ExecuteStepFromEvent { message_event: MessageReadyEvent, resp: CommandResponder<()> },
// System operations
GetWorkerStatus { resp: CommandResponder<WorkerStatus> },
SetEventIntegration { enabled: bool, resp: CommandResponder<()> },
GetEventStatus { resp: CommandResponder<EventIntegrationStatus> },
RefreshTemplateCache { namespace: Option<String>, resp: CommandResponder<()> },
HealthCheck { resp: CommandResponder<WorkerHealthStatus> },
Shutdown { resp: CommandResponder<()> },
}
}
Worker Queue Architecture
Workers monitor namespace-specific queues for step execution as Custom Namespace Queues that are dynamically configured per deployment
Example queues:
- fulfillment_queue: All fulfillment namespace steps
- inventory_queue: All inventory namespace steps
- notifications_queue: All notification namespace steps
- payment_queue: All payment processing steps
Event Flow and System Interactions
Complete Task Execution Flow
sequenceDiagram
participant Client
participant Orchestration
participant TaskDB
participant StepQueue
participant Worker
participant ResultQueue
%% Task Initialization
Client->>Orchestration: TaskRequestMessage (via pgmq_send_with_notify)
Orchestration->>TaskDB: Create Task + Steps
%% Step Discovery and Enqueueing (Event-Driven or Fallback Polling)
Orchestration->>StepQueue: pgmq_send_with_notify(ready steps)
StepQueue-->>Worker: pg_notify('pgmq_message_ready.{namespace}')
%% Step Execution
Worker->>StepQueue: pgmq.read() to claim step
Worker->>Worker: Execute Step Handler
Worker->>ResultQueue: pgmq_send_with_notify(StepExecutionResult)
ResultQueue-->>Orchestration: pg_notify('pgmq_message_ready.orchestration')
%% Result Processing
Orchestration->>Orchestration: ProcessStepResult Command
Orchestration->>TaskDB: Update Step State
Note over Orchestration: Fallback poller discovers ready steps if events missed
%% Task Completion
Note over Orchestration: All Steps Complete
Orchestration->>Orchestration: FinalizeTask Command
Orchestration->>TaskDB: Mark Task Complete
Orchestration-->>Client: Task Completed
Event-Driven Step Discovery
sequenceDiagram
participant Worker
participant PostgreSQL
participant PgmqNotify
participant OrchestrationListener
participant StepEnqueuer
Worker->>PostgreSQL: pgmq_send_with_notify('orchestration_step_results', result)
PostgreSQL->>PostgreSQL: Atomic: pgmq.send() + pg_notify()
PostgreSQL->>PgmqNotify: NOTIFY 'pgmq_message_ready.orchestration'
PgmqNotify->>OrchestrationListener: MessageReadyEvent
OrchestrationListener->>StepEnqueuer: ProcessStepResult Command
StepEnqueuer->>PostgreSQL: Query ready steps, enqueue via pgmq_send_with_notify()
Hybrid Mode Operation
stateDiagram-v2
[*] --> EventDriven
EventDriven --> Processing : Event Received
Processing --> EventDriven : Success
Processing --> PollingFallback : Event Failed
PollingFallback --> FallbackPolling : Start Polling
FallbackPolling --> EventDriven : Connection Restored
FallbackPolling --> Processing : Poll Found Work
EventDriven --> HealthCheck : Periodic Check
HealthCheck --> EventDriven : Healthy
HealthCheck --> PollingFallback : Event Issues Detected
Queue Architecture and Message Flow
PGMQ Integration
The system uses PostgreSQL Message Queue (PGMQ) for reliable message delivery:
Queue Types and Purposes
| Queue Name | Purpose | Message Type | Processing System |
|---|---|---|---|
orchestration_step_results | Step completion results | StepExecutionResult | Orchestration |
orchestration_task_requests | New task requests | TaskRequestMessage | Orchestration |
orchestration_task_finalization | Task finalization | TaskFinalizationMessage | Orchestration |
{namespace}_queue | Namespace-specific steps | SimpleStepMessage | Workers |
Message Processing Patterns
Event-Driven Processing:
- Message arrives in PGMQ queue
- PostgreSQL triggers pg_notify with
MessageReadyEvent - Event system receives notification
- System processes message via command pattern
- Message deleted after successful processing
Polling-Based Processing (Fallback):
- Periodic queue polling (configurable interval)
- Fetch available messages in batches
- Process messages via command pattern
- Delete processed messages
Circuit Breaker Integration
All PGMQ operations are protected by circuit breakers:
#![allow(unused)]
fn main() {
pub struct UnifiedPgmqClient {
standard_client: Box<dyn PgmqClientTrait + Send + Sync>,
protected_client: Option<ProtectedPgmqClient>,
circuit_breaker_enabled: bool,
}
}
Circuit Breaker Features:
- Automatic Protection: Failure detection and circuit opening
- Configurable Thresholds: Error rate and timeout configuration
- Seamless Fallback: Automatic switching between standard and protected clients
- Recovery Detection: Automatic circuit closing when service recovers
Statistics and Monitoring
Event System Statistics
Both orchestration and worker event systems implement comprehensive statistics:
#![allow(unused)]
fn main() {
pub trait EventSystemStatistics {
fn events_processed(&self) -> u64;
fn events_failed(&self) -> u64;
fn processing_rate(&self) -> f64; // events/second
fn average_latency_ms(&self) -> f64;
fn deployment_mode_score(&self) -> f64; // 0.0-1.0 effectiveness
fn success_rate(&self) -> f64; // derived: processed/(processed+failed)
}
}
Health Monitoring
Deployment Mode Health Status
#![allow(unused)]
fn main() {
pub enum DeploymentModeHealthStatus {
Healthy, // All systems operational
Degraded { reason: String },// Some issues but functional
Unhealthy { reason: String },// Significant issues
Critical { reason: String }, // System failure imminent
}
}
Health Check Integration
- Event System Health: Connection status, processing latency, error rates
- Command Processor Health: Queue backlog, processing timeout detection
- Database Health: Connection pool status, query performance
- Circuit Breaker Status: Circuit state, failure rates, recovery status
Metrics Collection
Key metrics collected across the system:
Orchestration Metrics
- Task Initialization Rate: Tasks/minute initialized
- Step Enqueueing Rate: Steps/minute enqueued to worker queues
- Result Processing Rate: Results/minute processed from workers
- Task Completion Rate: Tasks/minute completed successfully
- Error Rates: Failures by operation type and cause
Worker Metrics
- Step Execution Rate: Steps/minute executed
- Handler Performance: Execution time by handler type
- Queue Processing: Messages claimed/processed by queue
- Result Submission Rate: Results/minute sent to orchestration
- FFI Integration: Event correlation and handler communication stats
Error Handling and Resilience
Error Categories
The system handles multiple error categories with appropriate strategies:
Transient Errors
- Database Connection Issues: Circuit breaker protection + retry with exponential backoff
- Queue Processing Failures: Message retry with backoff, poison message detection
- Network Interruptions: Automatic fallback to polling mode
Permanent Errors
- Invalid Message Format: Dead letter queue for manual analysis
- Handler Execution Failures: Step failure state with retry limits
- Configuration Errors: System startup prevention with clear error messages
System Errors
- Resource Exhaustion: Graceful degradation and load shedding
- Component Crashes: Automatic restart with state recovery
- Data Corruption: Transaction rollback and consistency validation
Fallback Mechanisms
Event System Fallbacks
- Event-Driven -> Polling: Automatic fallback when event connection fails
- Real-time -> Batch: Switch to batch processing during high load
- Primary -> Secondary: Database failover support for high availability
Command Processing Fallbacks
- Async -> Sync: Degraded operation for critical operations
- Distributed -> Local: Local processing when coordination fails
- Optimistic -> Pessimistic: Conservative processing during uncertainty
Configuration Management
Event System Configuration
Event systems are configured via TOML with environment overrides:
# config/tasker/base/event_systems.toml
[orchestration_event_system]
system_id = "orchestration-events"
deployment_mode = "Hybrid"
health_monitoring_enabled = true
health_check_interval = "30s"
max_concurrent_processors = 10
processing_timeout = "100ms"
[orchestration_event_system.queue_listener]
enabled = true
batch_size = 50
poll_interval = "1s"
connection_timeout = "5s"
[orchestration_event_system.fallback_poller]
enabled = true
poll_interval = "5s"
batch_size = 20
max_retry_attempts = 3
[task_readiness]
enabled = true
polling_interval_seconds = 30
[orchestration_event_system]
system_id = "orchestration-events"
deployment_mode = "Hybrid"
# PGMQ channels handled by listeners, not direct postgres channels
supported_namespaces = ["orchestration"]
Runtime Configuration Changes
Certain configuration changes can be applied at runtime:
- Deployment Mode Switching: EventDrivenOnly <-> Hybrid <-> PollingOnly
- Event Integration Toggle: Enable/disable event processing
- Health Check Intervals: Adjust monitoring frequency
- Circuit Breaker Thresholds: Modify failure detection sensitivity
Integration Points
State Machine Integration
Event systems integrate tightly with the state machines documented in states-and-lifecycles.md:
- Task State Changes: Event systems react to task transitions
- Step State Changes: Step completion triggers task readiness checks
- Event Generation: State transitions generate events for system coordination
- Atomic Operations: Event processing maintains state machine consistency
Database Integration
Event systems coordinate with PostgreSQL at multiple levels:
- LISTEN/NOTIFY: Real-time notifications for database changes
- PGMQ Integration: Reliable message queues built on PostgreSQL
- Transaction Coordination: Event processing within database transactions
- SQL Functions: Database functions generate events and notifications
External System Integration
The event architecture supports integration with external systems:
- Webhook Events: HTTP callbacks for external system notifications
- Message Bus Integration: Apache Kafka, RabbitMQ, etc. for enterprise messaging
- Monitoring Integration: Prometheus, DataDog, etc. for metrics export
- API Integration: REST and GraphQL APIs for external coordination
Actor Integration
Overview
The tasker-core system implements a lightweight Actor pattern that formalizes the relationship between Commands and Lifecycle Components. This architecture provides a consistent, type-safe foundation for orchestration component management with all lifecycle operations coordinated through actors.
Status: Complete (Phases 1-7) - Production ready
For comprehensive actor documentation, see Actor-Based Architecture.
Actor Pattern Basics
The actor pattern introduces three core traits:
- OrchestrationActor: Base trait for all actors with lifecycle hooks
- Handler
: Message handling trait for type-safe command processing - Message: Marker trait for command messages
#![allow(unused)]
fn main() {
// Actor definition
pub struct TaskFinalizerActor {
context: Arc<SystemContext>,
service: TaskFinalizer,
}
// Message definition
pub struct FinalizeTaskMessage {
pub task_uuid: Uuid,
}
impl Message for FinalizeTaskMessage {
type Response = FinalizationResult;
}
// Message handler
#[async_trait]
impl Handler<FinalizeTaskMessage> for TaskFinalizerActor {
type Response = FinalizationResult;
async fn handle(&self, msg: FinalizeTaskMessage) -> TaskerResult<Self::Response> {
self.service.finalize_task(msg.task_uuid).await
.map_err(|e| e.into())
}
}
}
Integration with Command Processor
The actor pattern integrates seamlessly with the command processor through direct actor calls:
#![allow(unused)]
fn main() {
// From: tasker-orchestration/src/orchestration/command_processor.rs
async fn handle_finalize_task(&self, task_uuid: Uuid) -> TaskerResult<TaskFinalizationResult> {
// Direct actor-based task finalization
let msg = FinalizeTaskMessage { task_uuid };
let result = self.actors.task_finalizer_actor.handle(msg).await?;
Ok(TaskFinalizationResult::Success {
task_uuid: result.task_uuid,
final_status: format!("{:?}", result.action),
completion_time: Some(chrono::Utc::now()),
})
}
async fn handle_process_step_result(
&self,
step_result: StepExecutionResult,
) -> TaskerResult<StepProcessResult> {
// Direct actor-based step result processing
let msg = ProcessStepResultMessage {
result: step_result.clone(),
};
match self.actors.result_processor_actor.handle(msg).await {
Ok(()) => Ok(StepProcessResult::Success {
message: format!(
"Step {} result processed successfully",
step_result.step_uuid
),
}),
Err(e) => Ok(StepProcessResult::Error {
message: format!("Failed to process step result: {e}"),
}),
}
}
}
Event → Command → Actor Flow
The complete event-to-actor flow:
┌──────────────┐
│ PGMQ Message │ Message arrives in queue
└──────┬───────┘
│
▼
┌──────────────────┐
│ Event Listener │ EventDrivenSystem processes notification
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Command Channel │ Send command to processor via tokio::mpsc
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Command Processor│ Convert command to actor message
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Actor Registry │ Route message to appropriate actor
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Handler<M>:: │ Actor processes message
│ handle() │ Delegates to underlying service
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Response │ Return result to command processor
└──────────────────┘
ActorRegistry and Lifecycle
The ActorRegistry manages all 4 orchestration actors and integrates with the system lifecycle:
#![allow(unused)]
fn main() {
// During system startup
let context = Arc::new(SystemContext::with_pool(pool).await?);
let actors = ActorRegistry::build(context).await?; // Calls started() on all actors
// During operation
let msg = FinalizeTaskMessage { task_uuid };
let result = actors.task_finalizer_actor.handle(msg).await?;
// During shutdown
actors.shutdown().await; // Calls stopped() on all actors in reverse order
}
Current Actors:
- TaskRequestActor: Handles task initialization requests
- ResultProcessorActor: Processes step execution results
- StepEnqueuerActor: Manages batch processing of ready tasks
- TaskFinalizerActor: Handles task finalization with atomic claiming
Benefits for Event-Driven Architecture
The actor pattern enhances the event-driven architecture by providing:
- Type Safety: Compile-time verification of message contracts
- Consistency: Uniform lifecycle management across all components
- Testability: Clear message boundaries for isolated testing
- Observability: Actor-level metrics and tracing
- Evolvability: Easy to add new message handlers and actors
Implementation Status
The actor integration is complete:
-
Phase 1 ✅: Actor infrastructure and test harness
- OrchestrationActor, Handler
, Message traits - ActorRegistry structure
- OrchestrationActor, Handler
-
Phase 2-3 ✅: All 4 primary actors implemented
- TaskRequestActor, ResultProcessorActor
- StepEnqueuerActor, TaskFinalizerActor
-
Phase 4-6 ✅: Message hydration and module reorganization
- Hydration layer for PGMQ messages
- Clean module organization
-
Phase 7 ✅: Service decomposition
- Large services decomposed into focused components
- All files <300 lines following single responsibility principle
-
Cleanup ✅: Direct actor integration
- Command processor calls actors directly
- Removed intermediate wrapper layers
- Production-ready implementation
Service Decomposition
Large services (800-900 lines) were decomposed into focused components:
TaskFinalizer (848 → 6 files):
service.rs: Main TaskFinalizer (~200 lines)completion_handler.rs: Task completion logicevent_publisher.rs: Lifecycle event publishingexecution_context_provider.rs: Context fetchingstate_handlers.rs: State-specific handling
StepEnqueuerService (781 → 3 files):
service.rs: Main service (~250 lines)batch_processor.rs: Batch processing logicstate_handlers.rs: State-specific processing
ResultProcessor (889 → 4 files):
service.rs: Main processormetadata_processor.rs: Metadata handlingerror_handler.rs: Error processingresult_validator.rs: Result validation
This comprehensive event and command architecture, now enhanced with the actor pattern, provides the foundation for scalable, reliable, and maintainable workflow orchestration in the tasker-core system while maintaining the flexibility to operate in diverse deployment environments.