diff --git a/Earthfile b/Earthfile index 590dc8eb86..5fc8ef9da4 100644 --- a/Earthfile +++ b/Earthfile @@ -91,13 +91,13 @@ rust-base: ENV RUSTUP_HOME=/usr/local/rustup ENV CARGO_HOME=/usr/local/cargo ENV PATH=/usr/local/cargo/bin:$PATH - ENV RUST_VERSION=1.89.0 + ENV RUST_VERSION=1.90.0 ENV RUSTARCH=x86_64-unknown-linux-gnu RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/x86_64-unknown-linux-gnu/rustup-init" && \ echo "a3339fb004c3d0bb9862ba0bce001861fe5cbde9c10d16591eb3f39ee6cd3e7f *rustup-init" | sha256sum -c - && \ chmod +x rustup-init && \ - ./rustup-init -y --no-modify-path --profile minimal --default-toolchain 1.89.0 --default-host x86_64-unknown-linux-gnu && \ + ./rustup-init -y --no-modify-path --profile minimal --default-toolchain 1.90.0 --default-host x86_64-unknown-linux-gnu && \ rm rustup-init && \ chmod -R a+w $RUSTUP_HOME $CARGO_HOME diff --git a/container/Dockerfile b/container/Dockerfile index 3308c077bc..d6d1535586 100644 --- a/container/Dockerfile +++ b/container/Dockerfile @@ -93,7 +93,7 @@ ENV SCCACHE_BUCKET=${USE_SCCACHE:+${SCCACHE_BUCKET}} \ ENV RUSTUP_HOME=/usr/local/rustup \ CARGO_HOME=/usr/local/cargo \ PATH=/usr/local/cargo/bin:$PATH \ - RUST_VERSION=1.89.0 + RUST_VERSION=1.90.0 # Define Rust target based on ARCH_ALT ARG ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu @@ -367,4 +367,4 @@ RUN --mount=type=bind,source=./container/launch_message.txt,target=/opt/dynamo/l echo "cat ~/.launch_screen" >> ~/.bashrc ENTRYPOINT ["/opt/nvidia/nvidia_entrypoint.sh"] -CMD [] \ No newline at end of file +CMD [] diff --git a/container/Dockerfile.sglang-wideep b/container/Dockerfile.sglang-wideep index 8691d1500e..f084448538 100644 --- a/container/Dockerfile.sglang-wideep +++ b/container/Dockerfile.sglang-wideep @@ -11,7 +11,7 @@ ARG ARCH_ALT="x86_64" ARG NIXL_UCX_REF="v1.19.0" ARG NIXL_TAG="0.5.0" ARG CMAKE_VERSION="3.31.8" -ARG RUST_VERSION="1.89.0" +ARG RUST_VERSION="1.90.0" ARG CARGO_BUILD_JOBS="16" RUN apt-get update -y && \ diff --git a/container/Dockerfile.trtllm_prebuilt b/container/Dockerfile.trtllm_prebuilt index 28558610e1..ce237aea0d 100644 --- a/container/Dockerfile.trtllm_prebuilt +++ b/container/Dockerfile.trtllm_prebuilt @@ -41,7 +41,7 @@ ARG RUSTARCH=${ARCH_ALT}-unknown-linux-gnu ENV RUSTUP_HOME=/usr/local/rustup \ CARGO_HOME=/usr/local/cargo \ PATH=/usr/local/cargo/bin:$PATH \ - RUST_VERSION=1.89.0 + RUST_VERSION=1.90.0 # Install Rust using RUSTARCH derived from ARCH_ALT RUN wget --tries=3 --waitretry=5 "https://static.rust-lang.org/rustup/archive/1.28.1/${RUSTARCH}/rustup-init" && \ diff --git a/lib/bindings/python/Cargo.toml b/lib/bindings/python/Cargo.toml index 07c738f9ce..8b698fa441 100644 --- a/lib/bindings/python/Cargo.toml +++ b/lib/bindings/python/Cargo.toml @@ -8,7 +8,7 @@ [package] name = "dynamo-py3" version = "0.5.0" -edition = "2021" +edition = "2024" authors = ["NVIDIA"] license = "Apache-2.0" homepage = "https://github.com/ai-dynamo/dynamo" diff --git a/lib/bindings/python/rust/context.rs b/lib/bindings/python/rust/context.rs index 1edf38d465..bf7e4c2f82 100644 --- a/lib/bindings/python/rust/context.rs +++ b/lib/bindings/python/rust/context.rs @@ -3,8 +3,8 @@ // Context is a wrapper around the AsyncEngineContext to allow for Python bindings. -use dynamo_runtime::pipeline::context::Controller; pub use dynamo_runtime::pipeline::AsyncEngineContext; +use dynamo_runtime::pipeline::context::Controller; use pyo3::prelude::*; use std::sync::Arc; diff --git a/lib/bindings/python/rust/engine.rs b/lib/bindings/python/rust/engine.rs index dfef663c73..30178a3767 100644 --- a/lib/bindings/python/rust/engine.rs +++ b/lib/bindings/python/rust/engine.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use super::context::{callable_accepts_kwarg, Context}; +use super::context::{Context, callable_accepts_kwarg}; use pyo3::prelude::*; use pyo3::types::{PyDict, PyModule}; use pyo3::{PyAny, PyErr}; @@ -9,15 +9,15 @@ use pyo3_async_runtimes::TaskLocals; use pythonize::{depythonize, pythonize}; use std::sync::Arc; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{StreamExt, wrappers::ReceiverStream}; pub use dynamo_runtime::{ + CancellationToken, Error, Result, pipeline::{ - async_trait, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, - SingleIn, + AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn, + async_trait, }, protocols::annotated::Annotated, - CancellationToken, Error, Result, }; pub use serde::{Deserialize, Serialize}; @@ -180,7 +180,7 @@ where let py_request = pythonize(py, &request)?; let py_ctx = Py::new(py, Context::new(ctx_python.clone()))?; - let gen = if has_context { + let gen_result = if has_context { // Pass context as a kwarg let kwarg = PyDict::new(py); kwarg.set_item("context", &py_ctx)?; @@ -191,7 +191,10 @@ where }?; let locals = TaskLocals::new(event_loop.bind(py).clone()); - pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py)) + pyo3_async_runtimes::tokio::into_stream_with_locals_v1( + locals, + gen_result.into_bound(py), + ) }) }) .await??; @@ -234,18 +237,27 @@ where // right now, this is impossible as we are not passing the context to the python async generator // todo: add task-local context to the python async generator ctx.stop_generating(); - let msg = format!("critical error: invalid response object from python async generator; application-logic-mismatch: {}", e); + let msg = format!( + "critical error: invalid response object from python async generator; application-logic-mismatch: {}", + e + ); msg } ResponseProcessingError::PyGeneratorExit(_) => { "Stream ended before generation completed".to_string() } ResponseProcessingError::PythonException(e) => { - let msg = format!("a python exception was caught while processing the async generator: {}", e); + let msg = format!( + "a python exception was caught while processing the async generator: {}", + e + ); msg } ResponseProcessingError::OffloadError(e) => { - let msg = format!("critical error: failed to offload the python async generator to a new thread: {}", e); + let msg = format!( + "critical error: failed to offload the python async generator to a new thread: {}", + e + ); msg } }; diff --git a/lib/bindings/python/rust/http.rs b/lib/bindings/python/rust/http.rs index 61dcce607a..54be11db16 100644 --- a/lib/bindings/python/rust/http.rs +++ b/lib/bindings/python/rust/http.rs @@ -5,15 +5,14 @@ use std::sync::Arc; use pyo3::{exceptions::PyException, prelude::*}; -use crate::{engine::*, to_pyerr, CancellationToken}; +use crate::{CancellationToken, engine::*, to_pyerr}; pub use dynamo_llm::endpoint_type::EndpointType; pub use dynamo_llm::http::service::{error as http_error, service_v2}; pub use dynamo_runtime::{ - error, - pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn}, + Error, Result, error, + pipeline::{AsyncEngine, Data, ManyOut, SingleIn, async_trait}, protocols::annotated::Annotated, - Error, Result, }; #[pyclass] diff --git a/lib/bindings/python/rust/lib.rs b/lib/bindings/python/rust/lib.rs index 4bfd436752..072857fa53 100644 --- a/lib/bindings/python/rust/lib.rs +++ b/lib/bindings/python/rust/lib.rs @@ -3,10 +3,10 @@ use futures::StreamExt; use once_cell::sync::OnceCell; +use pyo3::IntoPyObjectExt; use pyo3::exceptions::PyStopAsyncIteration; use pyo3::types::PyBytes; use pyo3::types::{PyDict, PyList, PyString}; -use pyo3::IntoPyObjectExt; use pyo3::{exceptions::PyException, prelude::*}; use rand::seq::IteratorRandom as _; use rs::pipeline::network::Ingress; @@ -19,8 +19,8 @@ use tokio::sync::Mutex; use dynamo_runtime::{ self as rs, logging, pipeline::{ - context::Context as RsContext, network::egress::push_router::RouterMode as RsRouterMode, - EngineStream, ManyOut, SingleIn, + EngineStream, ManyOut, SingleIn, context::Context as RsContext, + network::egress::push_router::RouterMode as RsRouterMode, }, protocols::annotated::Annotated as RsAnnotated, traits::DistributedRuntimeProvider, @@ -446,9 +446,9 @@ impl DistributedRuntime { Ok(sock) => sockets.push(sock), Err(e) => { tracing::error!( - "Failed to bind to port block starting at {start_port} (attempt {}): {e}", - attempt_idx + 1, - ); + "Failed to bind to port block starting at {start_port} (attempt {}): {e}", + attempt_idx + 1, + ); bind_failed = true; break; } @@ -504,7 +504,8 @@ impl DistributedRuntime { } Err(PyErr::new::(format!( - "Failed to allocate and reserve a port block of size {block_size} from range {min}-{max} after {candidate_count} attempts"))) + "Failed to allocate and reserve a port block of size {block_size} from range {min}-{max} after {candidate_count} attempts" + ))) }) } @@ -735,12 +736,12 @@ impl Endpoint { })?; // Require an object/dict - if let Some(ref payload) = health_payload_json { - if !payload.is_object() { - return Err(pyo3::exceptions::PyTypeError::new_err( - "health_check_payload must be a JSON object (dict)", - )); - } + if let Some(ref payload) = health_payload_json + && !payload.is_object() + { + return Err(pyo3::exceptions::PyTypeError::new_err( + "health_check_payload must be a JSON object (dict)", + )); } let mut builder = self @@ -1087,11 +1088,10 @@ async fn process_stream( // Convert the response to a PyObject using Python's GIL let annotated: RsAnnotated = response; let annotated: RsAnnotated = annotated.map_data(|data| { - let result = Python::with_gil(|py| match pythonize::pythonize(py, &data) { + Python::with_gil(|py| match pythonize::pythonize(py, &data) { Ok(pyobj) => Ok(pyobj.into()), Err(e) => Err(e.to_string()), - }); - result + }) }); let is_error = annotated.is_error(); diff --git a/lib/bindings/python/rust/llm/block_manager/vllm.rs b/lib/bindings/python/rust/llm/block_manager/vllm.rs index 56bd675558..524af640ae 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm.rs @@ -11,13 +11,13 @@ use pyo3::{prelude::*, wrap_pymodule}; use dynamo_llm::{ block_manager::{ + BasicMetadata, DeviceStorage, Storage, block::{ + BlockId, ImmutableBlock, MutableBlock, data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::{LocalityProvider, Logical}, - BlockId, ImmutableBlock, MutableBlock, }, pool::{BlockPool, BlockPoolError}, - BasicMetadata, DeviceStorage, Storage, }, tokens::{SaltHash, SequenceHash, TokenBlockSequence, Tokens}, }; @@ -314,7 +314,19 @@ impl std::fmt::Debug for GenericSlotUpdate { format!("{:?}", self.tokens_to_append) }; - write!(f, "GenericSlotUpdate(request_id: {}, request_num_tokens: {}, request_num_computed_tokens: {}, tokens_to_append: {}, num_new_tokens: {}, num_new_computed_tokens: {:?}, new_computed_blocks: {:?}, num_lookahead_blocks: {:?}, delay_cache_blocks: {:?})", self.request_id, self.request_num_tokens, self.request_num_computed_tokens, tokens_display, self.num_new_tokens, self.num_new_computed_tokens, self.new_computed_blocks, self.num_lookahead_blocks, self.delay_cache_blocks) + write!( + f, + "GenericSlotUpdate(request_id: {}, request_num_tokens: {}, request_num_computed_tokens: {}, tokens_to_append: {}, num_new_tokens: {}, num_new_computed_tokens: {:?}, new_computed_blocks: {:?}, num_lookahead_blocks: {:?}, delay_cache_blocks: {:?})", + self.request_id, + self.request_num_tokens, + self.request_num_computed_tokens, + tokens_display, + self.num_new_tokens, + self.num_new_computed_tokens, + self.new_computed_blocks, + self.num_lookahead_blocks, + self.delay_cache_blocks + ) } } @@ -558,7 +570,8 @@ impl SlotManager { let isl_host = slot.num_blocks_cached_from_host() * self.block_size; let isl_disk = slot.num_blocks_cached_from_disk() * self.block_size; tracing::info!( - request_id, "request complete isl: {} - cache hits: device: {}, host: {}, disk: {} - prefilled: {}", + request_id, + "request complete isl: {} - cache hits: device: {}, host: {}, disk: {} - prefilled: {}", isl, isl_device, isl_host, diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs index 08c1449249..284e2f2006 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader.rs @@ -9,21 +9,21 @@ use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use dynamo_runtime::DistributedRuntime; use slot::{ConnectorSlotManager, SlotError, SlotManager, SlotState}; +use crate::DistributedRuntime as PyDistributedRuntime; use crate::llm::block_manager::BlockManagerBuilder; use crate::llm::block_manager::{ - distributed::KvbmLeader as PyKvbmLeader, vllm::connector::leader::slot::VllmConnectorSlot, - vllm::KvbmRequest, VllmBlockManager, + VllmBlockManager, distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest, + vllm::connector::leader::slot::VllmConnectorSlot, }; -use crate::DistributedRuntime as PyDistributedRuntime; use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use dynamo_llm::block_manager::{ + BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage, block::{ data::logical::distributed_leader_worker::DistributedLeaderWorkerResources, locality::Logical, }, connector::*, - BasicMetadata, DiskStorage, ImmutableBlock, PinnedStorage, }; use dynamo_llm::tokens::{SaltHash, TokenBlockSequence, Tokens}; use std::sync::{Arc, OnceLock}; @@ -218,7 +218,9 @@ impl Leader for KvConnectorLeader { ); if slot.state() == SlotState::SkippedPrefill || slot.state() == SlotState::SkippedDecode { - tracing::debug!("slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early"); + tracing::debug!( + "slot is in the SkippedPrefill or SkippedDecode state; will resume from skipped and return early" + ); match slot.state() { SlotState::SkippedPrefill => { slot.mark_as_prefilling(self.iteration_counter)?; diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs index 550d841f3b..447e18a1cf 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/leader/slot.rs @@ -5,10 +5,10 @@ use std::{any::Any, sync::Arc}; use dynamo_llm::{ block_manager::{ - block::{locality::LocalityProvider, BlockMetadata}, + Storage, + block::{BlockMetadata, locality::LocalityProvider}, connector::protocol::{LeaderTransferRequest, RequestType, TransferType}, distributed::{BlockTransferPool, BlockTransferRequest, KvbmLeader}, - Storage, }, tokens::TokenBlock, }; @@ -398,7 +398,11 @@ impl VllmConnectorSlot { SlotState::SkippedPrefill => Ok(()), // already skipped SlotState::SkippedDecode => Ok(()), // already skipped _ => { - tracing::debug!("slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", self.state, self.request_id); + tracing::debug!( + "slot is in the {:?} state; will not explicitly mark as skipped, request_id: {}", + self.state, + self.request_id + ); Ok(()) } } @@ -594,7 +598,8 @@ impl Slot for VllmConnectorSlot { if computed_position < self.current_position { tracing::debug!( "computed_position={} < current_position={}, so we are onboarding during prefilling phase", - computed_position, self.current_position + computed_position, + self.current_position ); return Ok(()); } @@ -916,7 +921,8 @@ impl ExternallyManagedDeviceSlot for VllmConnectorSlot { if self.current_position + num_tokens > self.sequence().total_tokens() { return Err(SlotError::InvalidOperation(format!( "cannot advance computed position from {} by {num_tokens} tokens, total tokens is {}", - self.current_position, self.sequence().total_tokens() + self.current_position, + self.sequence().total_tokens() ))); } diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs index 2890e1673c..61a44960ac 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_leader.rs @@ -3,12 +3,12 @@ use super::*; +use crate::DistributedRuntime as PyDistributedRuntime; +use crate::llm::block_manager::BlockManagerBuilder; use crate::llm::block_manager::vllm::connector::leader::slot::{ ConnectorSlotManager, SlotManager, SlotState, }; -use crate::llm::block_manager::BlockManagerBuilder; use crate::llm::block_manager::{distributed::KvbmLeader as PyKvbmLeader, vllm::KvbmRequest}; -use crate::DistributedRuntime as PyDistributedRuntime; use anyhow; use dynamo_llm::block_manager::metrics_kvbm::KvbmMetrics; use dynamo_runtime::metrics::prometheus_names::kvbm_connector; diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs index e7095e9e57..5a25ac7afd 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/trtllm_worker.rs @@ -13,15 +13,15 @@ use super::*; use crate::llm::block_manager::distributed::get_barrier_id_prefix; use crate::llm::block_manager::vllm::connector::worker::event_sync_blocking; use crate::{ - llm::block_manager::distributed::VllmTensor, to_pyerr, - DistributedRuntime as PyDistributedRuntime, + DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor, + to_pyerr, }; use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::storage::torch::TorchTensor; -use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::DistributedRuntime; +use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; pub trait Worker: Send + Sync { fn register_kv_caches( @@ -274,7 +274,10 @@ impl Worker for KvConnectorWorker { .maybe_finished_offloading .contains(&request_id.to_string()) { - tracing::warn!(request_id, "possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set"); + tracing::warn!( + request_id, + "possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set" + ); } else { tracing::debug!( request_id, @@ -300,7 +303,10 @@ impl Worker for KvConnectorWorker { .maybe_finished_onboarding .contains(&request_id.to_string()) { - tracing::warn!(request_id, "possibly got a duplicate finished request; request_id already in the maybe_finished_onboarding set"); + tracing::warn!( + request_id, + "possibly got a duplicate finished request; request_id already in the maybe_finished_onboarding set" + ); } } @@ -316,7 +322,9 @@ impl Worker for KvConnectorWorker { } else { // made this condition more strict slot existence checks were added as a prerequesite // to be added to the maybe_finished_offloading set. - panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set"); + panic!( + "request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set" + ); } } @@ -329,7 +337,10 @@ impl Worker for KvConnectorWorker { if self.connector.has_slot(request_id) { self.connector.remove_slot(request_id); } else { - tracing::debug!(request_id, "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set"); + tracing::debug!( + request_id, + "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set" + ); } } @@ -343,7 +354,9 @@ impl Worker for KvConnectorWorker { tracing::debug!(request_id, "request slot is not finished onboarding"); } } else { - panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set"); + panic!( + "request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set" + ); } } diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs index 94bc37dc49..8c8f73582e 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/connector/worker.rs @@ -13,16 +13,16 @@ use std::sync::{Arc, OnceLock}; use super::*; use crate::llm::block_manager::distributed::get_barrier_id_prefix; use crate::{ - llm::block_manager::distributed::VllmTensor, to_pyerr, - DistributedRuntime as PyDistributedRuntime, + DistributedRuntime as PyDistributedRuntime, llm::block_manager::distributed::VllmTensor, + to_pyerr, }; use dynamo_runtime::metrics::prometheus_names::kvbm_connector; use anyhow; use dynamo_llm::block_manager::distributed::{KvbmWorker, KvbmWorkerConfig}; use dynamo_llm::block_manager::storage::torch::TorchTensor; -use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; use dynamo_runtime::DistributedRuntime; +use dynamo_runtime::utils::task::CriticalTaskExecutionHandle; pub trait Worker: Send + Sync { fn register_kv_caches( @@ -326,7 +326,10 @@ impl Worker for KvConnectorWorker { "got a finished warning for a request that is onboarding" ); } else if self.maybe_finished_offloading.contains(&request_id) { - tracing::warn!(request_id, "possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set"); + tracing::warn!( + request_id, + "possibly got a duplicate finished request; request_id already in the maybe_finished_offloading set" + ); } else { tracing::debug!( request_id, @@ -348,7 +351,9 @@ impl Worker for KvConnectorWorker { } else { // made this condition more strict slot existence checks were added as a prerequesite // to be added to the maybe_finished_offloading set. - panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set"); + panic!( + "request slot missing for {request_id}; however, it was present when added to the maybe finished offloading set" + ); } } @@ -361,7 +366,10 @@ impl Worker for KvConnectorWorker { if self.connector.has_slot(request_id) { self.connector.remove_slot(request_id); } else { - tracing::debug!(request_id, "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set"); + tracing::debug!( + request_id, + "is_finished_offloading: request slot is not found - likely aborted, removing from is finished offloading set" + ); } } @@ -375,7 +383,9 @@ impl Worker for KvConnectorWorker { tracing::debug!(request_id, "request slot is not finished"); } } else { - panic!("request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set"); + panic!( + "request slot missing for {request_id}; however, it was present when added to the maybe finished onboarding set" + ); } } @@ -460,7 +470,7 @@ impl PyKvConnectorWorker { } use cudarc::driver::sys::{ - cuCtxGetCurrent, cuEventSynchronize, cudaError_enum, CUcontext, CUevent, + CUcontext, CUevent, cuCtxGetCurrent, cuEventSynchronize, cudaError_enum, }; use std::ptr; diff --git a/lib/bindings/python/rust/llm/block_manager/vllm/slot.rs b/lib/bindings/python/rust/llm/block_manager/vllm/slot.rs index ae58657136..7049b210a0 100644 --- a/lib/bindings/python/rust/llm/block_manager/vllm/slot.rs +++ b/lib/bindings/python/rust/llm/block_manager/vllm/slot.rs @@ -67,7 +67,11 @@ impl std::fmt::Debug for Slot { .iter() .map(|b| b.block_id()) .collect::>(); - write!(f, "Slot(computed_position: {}, prefill_position: {}, immutable_block_ids: {:?}, mutable_block_ids: {:?})", self.computed_position, self.prefill_position, immutable_block_ids, mutable_block_ids) + write!( + f, + "Slot(computed_position: {}, prefill_position: {}, immutable_block_ids: {:?}, mutable_block_ids: {:?})", + self.computed_position, self.prefill_position, immutable_block_ids, mutable_block_ids + ) } } diff --git a/lib/bindings/python/rust/llm/entrypoint.rs b/lib/bindings/python/rust/llm/entrypoint.rs index 5eeed993e0..8a4d19e263 100644 --- a/lib/bindings/python/rust/llm/entrypoint.rs +++ b/lib/bindings/python/rust/llm/entrypoint.rs @@ -6,9 +6,9 @@ use std::path::PathBuf; use pyo3::{exceptions::PyException, prelude::*}; -use dynamo_llm::entrypoint::input::Input; use dynamo_llm::entrypoint::EngineConfig as RsEngineConfig; use dynamo_llm::entrypoint::RouterConfig as RsRouterConfig; +use dynamo_llm::entrypoint::input::Input; use dynamo_llm::kv_router::KvRouterConfig as RsKvRouterConfig; use dynamo_llm::local_model::DEFAULT_HTTP_PORT; use dynamo_llm::local_model::{LocalModel, LocalModelBuilder}; diff --git a/lib/bindings/python/rust/llm/kv.rs b/lib/bindings/python/rust/llm/kv.rs index 4be3fbe59e..78d066bdd6 100644 --- a/lib/bindings/python/rust/llm/kv.rs +++ b/lib/bindings/python/rust/llm/kv.rs @@ -8,8 +8,8 @@ use tokio_stream::StreamExt; use super::*; use crate::Component; -use llm_rs::kv_router::indexer::compute_block_hash_for_seq; use llm_rs::kv_router::indexer::KvIndexerInterface; +use llm_rs::kv_router::indexer::compute_block_hash_for_seq; use llm_rs::kv_router::protocols::ForwardPassMetrics as RsForwardPassMetrics; use llm_rs::kv_router::protocols::KvStats as RsKvStats; use llm_rs::kv_router::protocols::SpecDecodeStats as RsSpecDecodeStats; @@ -18,7 +18,7 @@ use rs::traits::events::EventSubscriber; use tracing; use llm_rs::kv_router::protocols::*; -use llm_rs::kv_router::publisher::{create_stored_blocks, KvEventSourceConfig}; +use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks}; use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions}; #[pyfunction] diff --git a/lib/bindings/python/rust/llm/preprocessor.rs b/lib/bindings/python/rust/llm/preprocessor.rs index db2c6114eb..0504635377 100644 --- a/lib/bindings/python/rust/llm/preprocessor.rs +++ b/lib/bindings/python/rust/llm/preprocessor.rs @@ -8,10 +8,10 @@ use llm_rs::{ preprocessor::OpenAIPreprocessor, protocols::common::llm_backend::{BackendOutput, PreprocessedRequest}, types::{ + Annotated, openai::chat_completions::{ NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse, }, - Annotated, }, }; diff --git a/lib/llm/src/mocker/sequence.rs b/lib/llm/src/mocker/sequence.rs index f4ada40283..d467ebaa29 100644 --- a/lib/llm/src/mocker/sequence.rs +++ b/lib/llm/src/mocker/sequence.rs @@ -28,7 +28,7 @@ fn create_unique_blocks_from_sequence( .collect(); // Only push the partial block if tokens count isn't a multiple of block_size - if tokens.total_tokens() % block_size != 0 { + if !tokens.total_tokens().is_multiple_of(block_size) { unique_blocks.push(match uuid { Some(uuid) => UniqueBlock::PartialBlock(uuid), None => UniqueBlock::default(), @@ -258,7 +258,7 @@ impl ActiveSequence { self.generated_tokens = self.generated_tokens.saturating_sub(1); // Reverts to the last full block - if self.tokens.total_tokens() % self.block_size == 0 { + if self.tokens.total_tokens().is_multiple_of(self.block_size) { self.unique_blocks.pop(); } } diff --git a/lib/runtime/examples/Cargo.toml b/lib/runtime/examples/Cargo.toml index 531561ee96..4b8d0881ed 100644 --- a/lib/runtime/examples/Cargo.toml +++ b/lib/runtime/examples/Cargo.toml @@ -11,7 +11,7 @@ resolver = "3" [workspace.package] version = "0.5.0" -edition = "2021" +edition = "2024" authors = ["NVIDIA"] license = "Apache-2.0" homepage = "https://github.com/ai-dynamo/dynamo" diff --git a/lib/runtime/examples/hello_world/src/bin/client.rs b/lib/runtime/examples/hello_world/src/bin/client.rs index 6a3a3ad53e..0ca98ea136 100644 --- a/lib/runtime/examples/hello_world/src/bin/client.rs +++ b/lib/runtime/examples/hello_world/src/bin/client.rs @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use dynamo_runtime::{ - logging, pipeline::PushRouter, protocols::annotated::Annotated, stream::StreamExt, - DistributedRuntime, Result, Runtime, Worker, + DistributedRuntime, Result, Runtime, Worker, logging, pipeline::PushRouter, + protocols::annotated::Annotated, stream::StreamExt, }; use hello_world::DEFAULT_NAMESPACE; diff --git a/lib/runtime/examples/hello_world/src/bin/server.rs b/lib/runtime/examples/hello_world/src/bin/server.rs index ed4807a32a..3f4f9bea77 100644 --- a/lib/runtime/examples/hello_world/src/bin/server.rs +++ b/lib/runtime/examples/hello_world/src/bin/server.rs @@ -2,13 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use dynamo_runtime::{ - logging, + DistributedRuntime, Result, Runtime, Worker, logging, pipeline::{ - async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, - ResponseStream, SingleIn, + AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, + async_trait, network::Ingress, }, protocols::annotated::Annotated, - stream, DistributedRuntime, Result, Runtime, Worker, + stream, }; use hello_world::DEFAULT_NAMESPACE; use std::sync::Arc; diff --git a/lib/runtime/examples/rust-toolchain.toml b/lib/runtime/examples/rust-toolchain.toml index b67e7d5348..ff100edcbb 100644 --- a/lib/runtime/examples/rust-toolchain.toml +++ b/lib/runtime/examples/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.89.0" +channel = "1.90.0" diff --git a/lib/runtime/examples/service_metrics/src/bin/service_client.rs b/lib/runtime/examples/service_metrics/src/bin/service_client.rs index ccb939b218..17477372ed 100644 --- a/lib/runtime/examples/service_metrics/src/bin/service_client.rs +++ b/lib/runtime/examples/service_metrics/src/bin/service_client.rs @@ -5,8 +5,8 @@ use futures::StreamExt; use service_metrics::DEFAULT_NAMESPACE; use dynamo_runtime::{ - logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration, - DistributedRuntime, Result, Runtime, Worker, + DistributedRuntime, Result, Runtime, Worker, logging, pipeline::PushRouter, + protocols::annotated::Annotated, utils::Duration, }; fn main() -> Result<()> { diff --git a/lib/runtime/examples/service_metrics/src/bin/service_server.rs b/lib/runtime/examples/service_metrics/src/bin/service_server.rs index 7a970521cf..c331c7fcd0 100644 --- a/lib/runtime/examples/service_metrics/src/bin/service_server.rs +++ b/lib/runtime/examples/service_metrics/src/bin/service_server.rs @@ -1,16 +1,16 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use service_metrics::{MyStats, DEFAULT_NAMESPACE}; +use service_metrics::{DEFAULT_NAMESPACE, MyStats}; use dynamo_runtime::{ - logging, + DistributedRuntime, Result, Runtime, Worker, logging, pipeline::{ - async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, - ResponseStream, SingleIn, + AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, + async_trait, network::Ingress, }, protocols::annotated::Annotated, - stream, DistributedRuntime, Result, Runtime, Worker, + stream, }; use std::sync::Arc; diff --git a/lib/runtime/examples/system_metrics/src/bin/system_client.rs b/lib/runtime/examples/system_metrics/src/bin/system_client.rs index baef2b2d89..0548d36e9b 100644 --- a/lib/runtime/examples/system_metrics/src/bin/system_client.rs +++ b/lib/runtime/examples/system_metrics/src/bin/system_client.rs @@ -5,8 +5,8 @@ use futures::StreamExt; use system_metrics::{DEFAULT_COMPONENT, DEFAULT_ENDPOINT, DEFAULT_NAMESPACE}; use dynamo_runtime::{ - logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration, - DistributedRuntime, Result, Runtime, Worker, + DistributedRuntime, Result, Runtime, Worker, logging, pipeline::PushRouter, + protocols::annotated::Annotated, utils::Duration, }; fn main() -> Result<()> { diff --git a/lib/runtime/examples/system_metrics/src/bin/system_server.rs b/lib/runtime/examples/system_metrics/src/bin/system_server.rs index 649656dac5..7bed2472a0 100644 --- a/lib/runtime/examples/system_metrics/src/bin/system_server.rs +++ b/lib/runtime/examples/system_metrics/src/bin/system_server.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 -use dynamo_runtime::{logging, DistributedRuntime, Result, Runtime, Worker}; +use dynamo_runtime::{DistributedRuntime, Result, Runtime, Worker, logging}; use system_metrics::backend; fn main() -> Result<()> { diff --git a/lib/runtime/examples/system_metrics/src/lib.rs b/lib/runtime/examples/system_metrics/src/lib.rs index 133555e750..a4517aa0a1 100644 --- a/lib/runtime/examples/system_metrics/src/lib.rs +++ b/lib/runtime/examples/system_metrics/src/lib.rs @@ -2,13 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use dynamo_runtime::{ + DistributedRuntime, Result, metrics::MetricsRegistry, pipeline::{ - async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, - ResponseStream, SingleIn, + AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, + async_trait, network::Ingress, }, protocols::annotated::Annotated, - stream, DistributedRuntime, Result, + stream, }; use prometheus::IntCounter; use std::sync::Arc; diff --git a/lib/runtime/examples/system_metrics/tests/integration_test.rs b/lib/runtime/examples/system_metrics/tests/integration_test.rs index 9a8c56f9a0..3568b37abb 100644 --- a/lib/runtime/examples/system_metrics/tests/integration_test.rs +++ b/lib/runtime/examples/system_metrics/tests/integration_test.rs @@ -4,14 +4,14 @@ #![cfg(feature = "integration")] use dynamo_runtime::{ - pipeline::PushRouter, protocols::annotated::Annotated, DistributedRuntime, Result, Runtime, + DistributedRuntime, Result, Runtime, pipeline::PushRouter, protocols::annotated::Annotated, }; use futures::StreamExt; use rand::Rng; use reqwest; use std::env; -use system_metrics::{backend, DEFAULT_COMPONENT, DEFAULT_ENDPOINT, DEFAULT_NAMESPACE}; -use tokio::time::{sleep, Duration}; +use system_metrics::{DEFAULT_COMPONENT, DEFAULT_ENDPOINT, DEFAULT_NAMESPACE, backend}; +use tokio::time::{Duration, sleep}; #[tokio::test] async fn test_backend_with_metrics() -> Result<()> { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index b67e7d5348..ff100edcbb 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.89.0" +channel = "1.90.0"