Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: cleanups
Signed-off-by: Alexandre Milesi <[email protected]>
  • Loading branch information
milesial committed Nov 10, 2025
commit a2687d285a37c7488641aed92271ff15032c86bf
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/llm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dialoguer = { version = "0.11", default-features = false, features = [

# block_manager
aligned-vec = { version = "0.6.4", optional = true }
nixl-sys = { version = "0.7", optional = true }
nixl-sys = { version = "=0.7.0", optional = true }
cudarc = { workspace = true, optional = true }
nix = { version = "0.26", optional = true }

Expand Down
1 change: 0 additions & 1 deletion lib/llm/src/mocker/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
input: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<LLMEngineOutput>, Error> {
let (request, ctx) = input.into_parts();
println!("request: {request:?}");

// Extract dp_rank from request field (defaults to 0 if not set)
let dp_rank = request.dp_rank.unwrap_or(0);
Expand Down
12 changes: 9 additions & 3 deletions lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tracing;

use crate::model_card::{ModelDeploymentCard, ModelInfo};
#[cfg(feature = "media-nixl")]
use crate::preprocessor::media::{MediaDecoder, MediaLoader, MediaFetcher};
use crate::preprocessor::media::{MediaDecoder, MediaFetcher, MediaLoader};
use crate::preprocessor::prompt::OAIChatLikeRequest;
use crate::protocols::common::preprocessor::{
MultimodalData, MultimodalDataMap, PreprocessedRequestBuilder,
Expand Down Expand Up @@ -145,8 +145,13 @@ impl OpenAIPreprocessor {

// // Initialize runtime config from the ModelDeploymentCard
let runtime_config = mdc.runtime_config.clone();

#[cfg(feature = "media-nixl")]
let media_loader = Some(MediaLoader::new(MediaDecoder::default(), MediaFetcher::default())?);
let media_loader = match mdc.media_decoder {
Some(media_decoder) => Some(MediaLoader::new(media_decoder, mdc.media_fetcher)?),
None => None,
};

Ok(Arc::new(Self {
formatter,
tokenizer,
Expand Down Expand Up @@ -284,7 +289,8 @@ impl OpenAIPreprocessor {
let message_count = messages.len().unwrap_or(0);
let mut media_map: MultimodalDataMap = HashMap::new();
#[cfg(feature = "media-nixl")]
let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> = Vec::new();
let mut fetch_tasks: Vec<(String, ChatCompletionRequestUserMessageContentPart)> =
Vec::new();

for idx in 0..message_count {
let msg = messages
Expand Down
36 changes: 35 additions & 1 deletion lib/llm/src/preprocessor/media/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,39 @@

This component performs media download, base64 decoding, media decoding and NIXL registration. Today, this is used in the OpenAI preprocessor, to transform multimodal inputs (image_url, video_url, audio_url) into fully decoded data (pixel values, ...) accessible to the backends via NIXL.

## Usage

Media decoding is enabled when registering the MDC:

Set HTTP download options:

```python
from dynamo.llm import MediaFetcher
fetcher = MediaFetcher()
fetcher.user_agent("dynamo")
fetcher.timeout_ms(15000)
fetcher.allow_direct_ip(True)
fetcher.allow_direct_port(False)
fetcher.allowed_media_domains(["google.com"])
```

Set media decoding options:

```python
from dynamo.llm import MediaDecoder
decoder = MediaDecoder()
decoder.image_decoder({"max_image_width": 4096, "max_image_height": 4096, "max_alloc": 16*1024*1024})
```

And register the LLM as usual, adding the media configuration:

```python
register_llm(
...,
media_decoder=decoder,
media_fetcher=fetcher,
)
```


## TODOs
Expand All @@ -25,5 +58,6 @@ This component performs media download, base64 decoding, media decoding and NIXL
- [ ] Memory spilling to lower storage tiers
- [ ] Early-free memory on client notifications

### Observability
### Misc
- [ ] Observability on performance, memory usage and input distributions
- [ ] Per-request decoding options
24 changes: 11 additions & 13 deletions lib/llm/src/preprocessor/media/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@ use anyhow::Result;

use dynamo_async_openai::types::ChatCompletionRequestUserMessageContentPart;

use super::decoders::{MediaDecoder};
use super::decoders::MediaDecoder;
use super::rdma::RdmaMediaDataDescriptor;

#[cfg(feature = "media-nixl")]
use {
super::rdma::get_nixl_agent,
super::common::EncodedMediaData, super::decoders::Decoder, super::rdma::get_nixl_agent,
dynamo_memory::nixl::NixlAgent,
super::common::EncodedMediaData,
super::decoders::Decoder
};


const DEFAULT_HTTP_USER_AGENT: &str = "dynamo-ai/dynamo";
const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -55,8 +52,9 @@ pub struct MediaLoader {
}

impl MediaLoader {
pub fn new(media_decoder: MediaDecoder, media_fetcher: MediaFetcher) -> Result<Self> {
let mut http_client_builder =
pub fn new(media_decoder: MediaDecoder, media_fetcher: Option<MediaFetcher>) -> Result<Self> {
let media_fetcher = media_fetcher.unwrap_or_default();
let mut http_client_builder: reqwest::ClientBuilder =
reqwest::Client::builder().user_agent(&media_fetcher.user_agent);

if let Some(timeout) = media_fetcher.timeout {
Expand Down Expand Up @@ -109,7 +107,9 @@ impl MediaLoader {
// TODO: request-level options
) -> Result<RdmaMediaDataDescriptor> {
#[cfg(not(feature = "media-nixl"))]
anyhow::bail!("NIXL is not supported, cannot decode and register media data {oai_content_part:?}");
anyhow::bail!(
"NIXL is not supported, cannot decode and register media data {oai_content_part:?}"
);

#[cfg(feature = "media-nixl")]
{
Expand All @@ -133,11 +133,9 @@ impl MediaLoader {
_ => anyhow::bail!("Unsupported media type"),
};


let rdma_descriptor = decoded.into_rdma_descriptor(&self.nixl_agent)?;
Ok(rdma_descriptor)
}

}
}

Expand Down Expand Up @@ -225,7 +223,7 @@ mod tests_non_nixl {
allow_direct_ip: false,
..Default::default()
};
let loader = MediaLoader::new(MediaDecoder::default(), fetcher).unwrap();
let loader = MediaLoader::new(MediaDecoder::default(), Some(fetcher)).unwrap();

let url = url::Url::parse("http://192.168.1.1/image.jpg").unwrap();
let result = loader.check_if_url_allowed(&url);
Expand All @@ -245,7 +243,7 @@ mod tests_non_nixl {
allow_direct_port: false,
..Default::default()
};
let loader = MediaLoader::new(MediaDecoder::default(), fetcher).unwrap();
let loader = MediaLoader::new(MediaDecoder::default(), Some(fetcher)).unwrap();

let url = url::Url::parse("http://example.com:8080/image.jpg").unwrap();
let result = loader.check_if_url_allowed(&url);
Expand All @@ -269,7 +267,7 @@ mod tests_non_nixl {
allowed_media_domains: Some(allowed_domains),
..Default::default()
};
let loader = MediaLoader::new(MediaDecoder::default(), fetcher).unwrap();
let loader = MediaLoader::new(MediaDecoder::default(), Some(fetcher)).unwrap();

// Allowed domain should pass
let url = url::Url::parse("https://trusted.com/image.jpg").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/preprocessor/media/rdma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use serde::{Deserialize, Serialize};

#[cfg(feature = "media-nixl")]
use {
base64::{Engine as _, engine::general_purpose},
dynamo_memory::SystemStorage,
dynamo_memory::nixl::{self, NixlAgent, NixlDescriptor, RegisteredView},
base64::{Engine as _, engine::general_purpose},
std::sync::Arc,
};

Expand Down
Loading