Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

OpenTelemetry Improvements

Last Updated: 2025-12-01 Audience: Developers, Operators Status: Active Related Docs: Observability Hub | Metrics Reference | Domain Events

← Back to Observability Hub


This document describes the OpenTelemetry improvements for the domain event system, including two-phase FFI telemetry initialization, domain event metrics, and enhanced observability for the distributed event system.

Overview

These telemetry improvements support the domain event system while addressing FFI-specific challenges:

ImprovementPurposeImpact
Two-Phase FFI TelemetrySafe telemetry in FFI workersNo segfaults during Ruby/Python bridging
Domain Event MetricsEvent system observabilityReal-time monitoring of event publication
Correlation ID PropagationEnd-to-end tracingEvents traceable across distributed system
Worker Metrics EndpointDomain event statistics/metrics/events for monitoring dashboards

Two-Phase FFI Telemetry Initialization

The Problem

When Rust workers operate with Ruby FFI bindings, OpenTelemetry’s global tracer/meter providers can cause issues:

  1. Thread Safety: Ruby’s GVL (Global VM Lock) conflicts with OpenTelemetry’s internal threading
  2. Signal Handling: OpenTelemetry’s OTLP exporter may interfere with Ruby signal handling
  3. Segfaults: Premature initialization can cause crashes during FFI boundary crossings

The Solution: Two-Phase Initialization

flowchart LR
    subgraph Phase1["Phase 1 (FFI-Safe)"]
        A[Console logger]
        B[Tracing init]
        C[No OTLP export]
        D[No global state]
    end

    subgraph Phase2["Phase 2 (Full OTel)"]
        E[OTLP exporter]
        F[Metrics export]
        G[Full tracing]
        H[Global tracer]
    end

    Phase1 -->|"After FFI bridge<br/>established"| Phase2

Worker Bootstrap Sequence:

  1. Load Rust worker library
  2. Initialize Phase 1 (console-only logging)
  3. Execute FFI bridge setup (Ruby/Python)
  4. Initialize Phase 2 (full OpenTelemetry)

Implementation

Phase 1: Console-Only Initialization (FFI-Safe):

#![allow(unused)]
fn main() {
// tasker-shared/src/logging.rs (lines 284-326)

/// Initialize console-only logging (FFI-safe, no Tokio runtime required)
///
/// This function sets up structured console logging without OpenTelemetry,
/// making it safe to call from FFI initialization contexts where no Tokio
/// runtime exists yet.
pub fn init_console_only() {
    TRACING_INITIALIZED.get_or_init(|| {
        let environment = get_environment();
        let log_level = get_log_level(&environment);

        // Determine if we're in a TTY for ANSI color support
        let use_ansi = IsTerminal::is_terminal(&std::io::stdout());

        // Create base console layer
        let console_layer = fmt::layer()
            .with_target(true)
            .with_thread_ids(true)
            .with_level(true)
            .with_ansi(use_ansi)
            .with_filter(EnvFilter::new(&log_level));

        // Build subscriber with console layer only (no telemetry)
        let subscriber = tracing_subscriber::registry().with(console_layer);

        if subscriber.try_init().is_err() {
            tracing::debug!(
                "Global tracing subscriber already initialized"
            );
        } else {
            tracing::info!(
                environment = %environment,
                opentelemetry_enabled = false,
                context = "ffi_initialization",
                "Console-only logging initialized (FFI-safe mode)"
            );
        }

        // Initialize basic metrics (no OpenTelemetry exporters)
        metrics::init_metrics();
        metrics::orchestration::init();
        metrics::worker::init();
        metrics::database::init();
        metrics::messaging::init();
    });
}
}

Phase 2: Full OpenTelemetry Initialization:

#![allow(unused)]
fn main() {
// tasker-shared/src/logging.rs (lines 361-449)

/// Initialize tracing with console output and optional OpenTelemetry
///
/// When OpenTelemetry is enabled (via TELEMETRY_ENABLED=true), it also
/// configures distributed tracing with OTLP exporter.
///
/// **IMPORTANT**: When telemetry is enabled, this function MUST be called from
/// a Tokio runtime context because the batch exporter requires async I/O.
pub fn init_tracing() {
    TRACING_INITIALIZED.get_or_init(|| {
        let environment = get_environment();
        let log_level = get_log_level(&environment);
        let telemetry_config = TelemetryConfig::default();

        // Determine if we're in a TTY for ANSI color support
        let use_ansi = IsTerminal::is_terminal(&std::io::stdout());

        // Create base console layer
        let console_layer = fmt::layer()
            .with_target(true)
            .with_thread_ids(true)
            .with_level(true)
            .with_ansi(use_ansi)
            .with_filter(EnvFilter::new(&log_level));

        // Build subscriber with optional OpenTelemetry layer
        let subscriber = tracing_subscriber::registry().with(console_layer);

        if telemetry_config.enabled {
            // Initialize OpenTelemetry tracer and logger
            match (init_opentelemetry_tracer(&telemetry_config),
                   init_opentelemetry_logger(&telemetry_config)) {
                (Ok(tracer_provider), Ok(logger_provider)) => {
                    // Add trace layer
                    let tracer = tracer_provider.tracer("tasker-core");
                    let telemetry_layer = OpenTelemetryLayer::new(tracer);

                    // Add log layer (bridge tracing logs -> OTEL logs)
                    let log_layer = OpenTelemetryTracingBridge::new(&logger_provider);

                    let subscriber = subscriber.with(telemetry_layer).with(log_layer);

                    if subscriber.try_init().is_ok() {
                        tracing::info!(
                            environment = %environment,
                            opentelemetry_enabled = true,
                            logs_enabled = true,
                            otlp_endpoint = %telemetry_config.otlp_endpoint,
                            service_name = %telemetry_config.service_name,
                            "Console logging with OpenTelemetry initialized"
                        );
                    }
                }
                // ... error handling with fallback to console-only
            }
        }
    });
}
}

Worker Bootstrap Integration:

#![allow(unused)]
fn main() {
// workers/rust/src/bootstrap.rs (lines 69-131)

pub async fn bootstrap() -> Result<(WorkerSystemHandle, RustEventHandler)> {
    info!("📋 Creating native Rust step handler registry...");
    let registry = Arc::new(RustStepHandlerRegistry::new());

    // Get global event system for connecting to worker events
    info!("🔗 Setting up event system connection...");
    let event_system = get_global_event_system();

    // Bootstrap the worker using tasker-worker foundation
    info!("🏗️  Bootstrapping worker with tasker-worker foundation...");
    let worker_handle =
        WorkerBootstrap::bootstrap_with_event_system(Some(event_system.clone())).await?;

    // Create step event publisher registry with domain event publisher
    info!("🔔 Setting up step event publisher registry...");
    let domain_event_publisher = {
        let worker_core = worker_handle.worker_core.lock().await;
        worker_core.domain_event_publisher()
    };

    // Dual-Path: Create in-process event bus for fast event delivery
    info!("⚡ Creating in-process event bus for fast domain events...");
    let in_process_bus = Arc::new(RwLock::new(InProcessEventBus::new(
        InProcessEventBusConfig::default(),
    )));

    // Dual-Path: Create event router for dual-path delivery
    info!("🔀 Creating event router for dual-path delivery...");
    let event_router = Arc::new(RwLock::new(EventRouter::new(
        domain_event_publisher.clone(),
        in_process_bus.clone(),
    )));

    // Create registry with EventRouter for dual-path delivery
    let mut step_event_registry =
        StepEventPublisherRegistry::with_event_router(
            domain_event_publisher.clone(),
            event_router
        );

    Ok((worker_handle, event_handler))
}
}

Configuration

Telemetry is configured exclusively via environment variables. This is intentional because logging must be initialized before the TOML config loader runs (to log any config loading errors).

# Enable OpenTelemetry
export TELEMETRY_ENABLED=true

# OTLP endpoint (default: http://localhost:4317)
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317

# Service identification
export OTEL_SERVICE_NAME=tasker-orchestration
export OTEL_SERVICE_VERSION=0.1.0

# Deployment environment (falls back to TASKER_ENV, then "development")
export DEPLOYMENT_ENVIRONMENT=production

# Sampling rate (0.0 to 1.0, default: 1.0 = 100%)
export OTEL_TRACES_SAMPLER_ARG=1.0

The TelemetryConfig::default() implementation in tasker-shared/src/logging.rs:144-164 reads all values from environment variables at initialization time.

Domain Event Metrics

New Metrics

Domain event observability metrics:

MetricTypeDescription
tasker.domain_events.published.totalCounterTotal events published
router.durable_routedCounterEvents sent via durable path (PGMQ)
router.fast_routedCounterEvents sent via fast path (in-process)
router.broadcast_routedCounterEvents broadcast to both paths

Implementation

Domain event metrics are emitted inline during publication:

#![allow(unused)]
fn main() {
// tasker-shared/src/events/domain_events.rs (lines 207-219)

// Emit OpenTelemetry metric
let counter = opentelemetry::global::meter("tasker")
    .u64_counter("tasker.domain_events.published.total")
    .with_description("Total number of domain events published")
    .build();

counter.add(
    1,
    &[
        opentelemetry::KeyValue::new("event_name", event_name.to_string()),
        opentelemetry::KeyValue::new("namespace", metadata.namespace.clone()),
    ],
);
}

Event routing statistics are tracked in the EventRouterStats and InProcessEventBusStats structures:

#![allow(unused)]
fn main() {
// tasker-shared/src/metrics/worker.rs (lines 431-444)

/// Statistics for the event router
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct EventRouterStats {
    /// Total events routed through the router
    pub total_routed: u64,
    /// Events sent via durable path (PGMQ)
    pub durable_routed: u64,
    /// Events sent via fast path (in-process)
    pub fast_routed: u64,
    /// Events broadcast to both paths
    pub broadcast_routed: u64,
    /// Fast delivery errors in broadcast mode (non-fatal, logged for monitoring)
    pub fast_delivery_errors: u64,
    /// Failed routing attempts (durable failures only)
    pub routing_errors: u64,
}

// tasker-shared/src/metrics/worker.rs (lines 455-467)

/// Statistics for the in-process event bus
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct InProcessEventBusStats {
    /// Total events dispatched through the bus
    pub total_events_dispatched: u64,
    /// Total events dispatched to Rust handlers
    pub rust_handler_dispatches: u64,
    /// Total events dispatched to FFI channel
    pub ffi_channel_dispatches: u64,
}
}

Prometheus Queries

Event publication rate by namespace:

sum by (namespace) (rate(tasker_domain_events_published_total[5m]))

Event failure rate:

rate(tasker_domain_events_failed_total[5m]) /
rate(tasker_domain_events_published_total[5m])

Publication latency (P95):

histogram_quantile(0.95,
  sum by (le) (rate(tasker_domain_events_publish_duration_milliseconds_bucket[5m]))
)

Latency by delivery mode:

histogram_quantile(0.95,
  sum by (delivery_mode, le) (
    rate(tasker_domain_events_publish_duration_milliseconds_bucket[5m])
  )
)

Worker Metrics Endpoint

/metrics/events Endpoint

The worker exposes domain event statistics through a dedicated metrics endpoint:

Request:

curl http://localhost:8081/metrics/events

Response:

{
  "router": {
    "total_routed": 42,
    "durable_routed": 10,
    "fast_routed": 30,
    "broadcast_routed": 2,
    "fast_delivery_errors": 0,
    "routing_errors": 0
  },
  "in_process_bus": {
    "total_events_dispatched": 32,
    "rust_handler_dispatches": 20,
    "ffi_channel_dispatches": 12
  },
  "captured_at": "2025-12-01T10:30:00Z",
  "worker_id": "worker-01234567"
}

Implementation

#![allow(unused)]
fn main() {
// tasker-worker/src/web/handlers/metrics.rs (lines 178-218)

/// Domain event statistics endpoint: GET /metrics/events
///
/// Returns statistics about domain event routing and delivery paths.
/// Used for monitoring event publishing and by E2E tests to verify
/// events were published through the expected delivery paths.
///
/// # Response
///
/// Returns statistics for:
/// - **Router stats**: durable_routed, fast_routed, broadcast_routed counts
/// - **In-process bus stats**: handler dispatches, FFI channel dispatches
pub async fn domain_event_stats(
    State(state): State<Arc<WorkerWebState>>,
) -> Json<DomainEventStats> {
    debug!("Serving domain event statistics");

    // Use cached event components - does not lock worker core
    let stats = state.domain_event_stats().await;

    Json(stats)
}
}

The DomainEventStats structure is defined in tasker-shared/src/types/web.rs:

#![allow(unused)]
fn main() {
// tasker-shared/src/types/web.rs (lines 546-555)

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DomainEventStats {
    /// Event router statistics
    pub router: EventRouterStats,
    /// In-process event bus statistics
    pub in_process_bus: InProcessEventBusStats,
    /// Timestamp when stats were captured
    pub captured_at: DateTime<Utc>,
    /// Worker ID for correlation
    pub worker_id: String,
}
}

Correlation ID Propagation

End-to-End Tracing

Domain events maintain correlation IDs for distributed tracing:

flowchart LR
    subgraph TaskCreation["Task Creation"]
        A[correlation_id<br/>UUIDv7]
    end

    subgraph StepExecution["Step Execution"]
        B[correlation_id<br/>propagated]
    end

    subgraph DomainEvent["Domain Event"]
        C[correlation_id<br/>in metadata]
    end

    TaskCreation --> StepExecution --> DomainEvent

    subgraph TraceContext["Trace Context"]
        D[task_uuid]
        E[step_uuid]
        F[step_name]
        G[namespace]
        H[correlation_id]
    end

Tracing Integration

The DomainEventPublisher::publish_event method uses #[instrument] for automatic span creation:

#![allow(unused)]
fn main() {
// tasker-shared/src/events/domain_events.rs (lines 157-231)

#[instrument(skip(self, payload, metadata), fields(
    event_name = %event_name,
    namespace = %metadata.namespace,
    correlation_id = %metadata.correlation_id
))]
pub async fn publish_event(
    &self,
    event_name: &str,
    payload: DomainEventPayload,
    metadata: EventMetadata,
) -> Result<Uuid, DomainEventError> {
    let event_id = Uuid::now_v7();
    let queue_name = format!("{}_domain_events", metadata.namespace);

    debug!(
        event_id = %event_id,
        event_name = %event_name,
        queue_name = %queue_name,
        task_uuid = %metadata.task_uuid,
        correlation_id = %metadata.correlation_id,
        "Publishing domain event"
    );

    // Create and serialize domain event
    let event = DomainEvent {
        event_id,
        event_name: event_name.to_string(),
        event_version: "1.0".to_string(),
        payload,
        metadata: metadata.clone(),
    };

    // Publish to PGMQ
    let message_id = self.message_client
        .send_json_message(&queue_name, &event_json)
        .await?;

    info!(
        event_id = %event_id,
        message_id = message_id,
        correlation_id = %metadata.correlation_id,
        "Domain event published successfully"
    );

    Ok(event_id)
}
}

Querying by Correlation ID

Find all events for a task:

# In Grafana/Tempo
correlation_id = "0199c3e0-ccdb-7581-87ab-3f67daeaa4a5"

In PostgreSQL (PGMQ queues):

SELECT
    message->>'event_name' as event,
    message->'metadata'->>'step_name' as step,
    message->'metadata'->>'fired_at' as fired_at
FROM pgmq.q_payments_domain_events
WHERE message->'metadata'->>'correlation_id' = '0199c3e0-ccdb-7581-87ab-3f67daeaa4a5'
ORDER BY message->'metadata'->>'fired_at';

Span Hierarchy

Domain Event Spans

Domain event spans:

Task Execution (root span)
├── Step Execution
│   ├── Handler Call
│   │   └── Business Logic
│   └── publish_domain_event           ◄── NEW
│       ├── route_event
│       │   ├── publish_durable        (if durable/broadcast)
│       │   └── publish_fast           (if fast/broadcast)
│       └── record_metrics
└── Result Submission

Span Attributes

SpanAttributes
publish_domain_eventevent_name, namespace, correlation_id, delivery_mode
route_eventdelivery_mode, target_queue (if durable)
publish_durablequeue_name, message_size
publish_fastsubscriber_count

Troubleshooting

Console-Only Mode (No OTLP Export)

Symptom: Logs show “Console-only logging initialized (FFI-safe mode)” but no OpenTelemetry traces

Cause: init_console_only() was called but init_tracing() was never called, or TELEMETRY_ENABLED=false

Fix:

  1. Check initialization logs:
    grep -E "(Console-only|OpenTelemetry)" logs/worker.log
    
  2. Verify TELEMETRY_ENABLED=true is set:
    grep "opentelemetry_enabled" logs/worker.log
    

Domain Event Metrics Missing

Symptom: /metrics/events returns zeros for all stats

Cause: Events not being published or the event router/bus not tracking statistics

Fix:

  1. Verify events are being published:
    grep "Domain event published successfully" logs/worker.log
    
  2. Check event router initialization:
    grep "event router" logs/worker.log
    
  3. Verify in-process event bus is configured:
    grep "in-process event bus" logs/worker.log
    

Correlation ID Not Propagating

Symptom: Events have different correlation IDs than parent task

Cause: EventMetadata not constructed with task’s correlation_id

Fix: Verify EventMetadata is constructed with the correct correlation_id from the task:

#![allow(unused)]
fn main() {
// When constructing EventMetadata, always use the task's correlation_id
let metadata = EventMetadata {
    task_uuid: step_data.task.task.task_uuid,
    step_uuid: Some(step_data.workflow_step.workflow_step_uuid),
    step_name: Some(step_data.workflow_step.name.clone()),
    namespace: step_data.task.namespace_name.clone(),
    correlation_id: step_data.task.task.correlation_id,  // Must use task's ID
    fired_at: chrono::Utc::now(),
    fired_by: handler_name.to_string(),
};
}

Best Practices

1. Always Use Two-Phase Init for FFI Workers

#![allow(unused)]
fn main() {
// Correct: Two-phase initialization pattern
// Phase 1: During FFI initialization (Magnus, PyO3, WASM)
tasker_shared::logging::init_console_only();

// Phase 2: After runtime creation
let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(async {
    tasker_shared::logging::init_tracing();
});

// Incorrect: Calling init_tracing() during FFI initialization
// before Tokio runtime exists (may cause issues with OTLP exporter)
}

2. Include Correlation ID in All Events

#![allow(unused)]
fn main() {
// Always propagate correlation_id from the task
let metadata = EventMetadata {
    task_uuid: step_data.task.task.task_uuid,
    step_uuid: Some(step_data.workflow_step.workflow_step_uuid),
    step_name: Some(step_data.workflow_step.name.clone()),
    namespace: step_data.task.namespace_name.clone(),
    correlation_id: step_data.task.task.correlation_id,  // Critical!
    fired_at: chrono::Utc::now(),
    fired_by: handler_name.to_string(),
};
}

3. Use Structured Logging with Correlation Context

#![allow(unused)]
fn main() {
// All logs should include correlation_id for trace correlation
info!(
    event_id = %event_id,
    event_name = %event_name,
    correlation_id = %metadata.correlation_id,
    namespace = %metadata.namespace,
    "Domain event published successfully"
);
}


This telemetry architecture provides robust observability for domain events while ensuring safe operation with FFI-based language bindings.