Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ fn create_request_context(
/// import the module.
#[pymodule]
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
rs::logging::init();
m.add_function(wrap_pyfunction!(llm::kv::compute_block_hash_for_seq_py, m)?)?;
m.add_function(wrap_pyfunction!(log_message, m)?)?;
m.add_function(wrap_pyfunction!(register_llm, m)?)?;
Expand Down Expand Up @@ -423,10 +424,12 @@ impl DistributedRuntime {

let runtime = worker.runtime().clone();

// Initialize logging in context where tokio runtime is available
// Activate OTEL exports in context where tokio runtime is available
// otel exporter requires it
runtime.secondary().block_on(async {
rs::logging::init();
if let Err(e) = rs::logging::activate_otel_exports() {
tracing::warn!("Failed to activate OTEL exports: {}", e);
}
});

let inner =
Expand Down
131 changes: 75 additions & 56 deletions lib/runtime/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
//! ```

use std::collections::{BTreeMap, HashMap};
use std::sync::Once;
use std::sync::{Once, Mutex as StdMutex};
use once_cell::sync::OnceCell;

use figment::{
Figment,
Expand Down Expand Up @@ -75,6 +76,7 @@ use opentelemetry::{global, trace::Tracer};
use opentelemetry_otlp::WithExportConfig;

use opentelemetry::trace::TracerProvider as _;
use tracing_opentelemetry::OpenTelemetryLayer;
use opentelemetry::{Key, KeyValue};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::trace::SdkTracerProvider;
Expand All @@ -86,6 +88,7 @@ use std::time::Duration;
use tracing::{info, instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::reload;

/// ENV used to set the log level
const FILTER_ENV: &str = "DYN_LOG";
Expand Down Expand Up @@ -114,6 +117,15 @@ const DEFAULT_OTEL_SERVICE_NAME: &str = "dynamo";
/// Once instance to ensure the logger is only initialized once
static INIT: Once = Once::new();

/// Type alias for the OTEL layer - using Option to allow starting with None
type OtelLayer = Option<OpenTelemetryLayer<Registry, opentelemetry_sdk::trace::Tracer>>;

/// Type alias for the OTEL layer reload handle
type OtelReloadHandle = reload::Handle<OtelLayer, Registry>;

/// Global reload handle for the OpenTelemetry layer
static OTEL_RELOAD_HANDLE: OnceCell<StdMutex<OtelReloadHandle>> = OnceCell::new();

#[derive(Serialize, Deserialize, Debug)]
struct LoggingConfig {
log_level: String,
Expand All @@ -135,9 +147,9 @@ impl Default for LoggingConfig {
("tonic".to_string(), "error".to_string()),
("mistralrs_core".to_string(), "error".to_string()),
("hf_hub".to_string(), "error".to_string()),
("opentelemetry".to_string(), "error".to_string()),
("opentelemetry-otlp".to_string(), "error".to_string()),
("opentelemetry_sdk".to_string(), "error".to_string()),
("opentelemetry".to_string(), "trace".to_string()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temporarily modified for debugging

("opentelemetry-otlp".to_string(), "trace".to_string()),
("opentelemetry_sdk".to_string(), "trace".to_string()),
]),
}
}
Expand Down Expand Up @@ -714,6 +726,56 @@ pub fn init() {
});
}

/// Activate OTEL exports by reloading the OpenTelemetry layer with an OTLP exporter.
/// This should be called after the Tokio runtime is available and only if OTEL_EXPORT_ENABLED is set.
pub fn activate_otel_exports() -> Result<(), Box<dyn std::error::Error>> {
if !otlp_exporter_enabled() {
tracing::debug!("OTEL exports not enabled, skipping activation");
return Ok(());
}

let handle = OTEL_RELOAD_HANDLE.get()
.ok_or("Logging not initialized with JSONL mode")?;

let service_name = get_service_name();
let endpoint = std::env::var(OTEL_EXPORT_ENDPOINT_ENV)
.unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());

tracing::info!(
"Activating OpenTelemetry OTLP export, endpoint: {}, service: {}",
endpoint,
service_name
);

// Initialize OTLP exporter using gRPC (Tonic)
let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()?;

// Create tracer provider with batch exporter and service name
let tracer_provider = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(otlp_exporter)
.with_resource(
opentelemetry_sdk::Resource::builder_empty()
.with_service_name(service_name.clone())
.build(),
)
.build();

// Get a tracer from the provider
let tracer = tracer_provider.tracer(service_name);
let new_otel_layer: OtelLayer = Some(OpenTelemetryLayer::new(tracer));

// Reload the layer
handle.lock()
.map_err(|e| format!("Failed to acquire lock on reload handle: {}", e))?
.reload(new_otel_layer)?;

tracing::info!("OpenTelemetry OTLP export activated successfully");
Ok(())
}

#[cfg(feature = "tokio-console")]
fn setup_logging() {
let tokio_console_layer = console_subscriber::ConsoleLayer::builder()
Expand All @@ -739,7 +801,6 @@ fn setup_logging() {
fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
let fmt_filter_layer = filters(load_config());
let trace_filter_layer = filters(load_config());
let otel_filter_layer = filters(load_config());

if jsonl_logging_enabled() {
let l = fmt::layer()
Expand All @@ -749,61 +810,19 @@ fn setup_logging() -> Result<(), Box<dyn std::error::Error>> {
.with_writer(std::io::stderr)
.with_filter(fmt_filter_layer);

// Create OpenTelemetry tracer - conditionally export to OTLP based on env var
let service_name = get_service_name();

// Build tracer provider - with or without OTLP export
let tracer_provider = if otlp_exporter_enabled() {
// Export enabled: create OTLP exporter with batch processor
let endpoint = std::env::var(OTEL_EXPORT_ENDPOINT_ENV)
.unwrap_or_else(|_| DEFAULT_OTLP_ENDPOINT.to_string());

tracing::info!(
"OpenTelemetry OTLP export enabled, endpoint: {}, service: {}",
endpoint,
service_name
);

// Initialize OTLP exporter using gRPC (Tonic)
let otlp_exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()?;

// Create tracer provider with batch exporter and service name
opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_batch_exporter(otlp_exporter)
.with_resource(
opentelemetry_sdk::Resource::builder_empty()
.with_service_name(service_name.clone())
.build(),
)
.build()
} else {
// No export - traces generated locally only (for logging/trace IDs)
tracing::info!(
"OpenTelemetry OTLP export disabled, traces local only, service: {}",
service_name
);
// Start with None (no-op) OpenTelemetry layer
// This will be reloaded with a real layer when activate_otel_exports() is called
let otel_layer: OtelLayer = None;

opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(
opentelemetry_sdk::Resource::builder_empty()
.with_service_name(service_name.clone())
.build(),
)
.build()
};
// Create a reloadable layer
let (otel_layer, reload_handle) = reload::Layer::new(otel_layer);

// Get a tracer from the provider
let tracer = tracer_provider.tracer(service_name);
// Store the reload handle globally
OTEL_RELOAD_HANDLE.set(StdMutex::new(reload_handle))
.map_err(|_| "Failed to set OTEL reload handle")?;

tracing_subscriber::registry()
.with(
tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(otel_filter_layer),
)
.with(otel_layer)
.with(DistributedTraceIdLayer.with_filter(trace_filter_layer))
.with(l)
.init();
Expand Down
Loading