diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index d64956b84..378e6ab57 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -23,6 +23,7 @@ else() add_subdirectory(quantize) if (LLAMA_BUILD_SERVER) add_subdirectory(server) + add_subdirectory(router) endif() add_subdirectory(run) add_subdirectory(tokenize) diff --git a/tools/router/ARCHITECTURE.md b/tools/router/ARCHITECTURE.md new file mode 100644 index 000000000..3c60f258d --- /dev/null +++ b/tools/router/ARCHITECTURE.md @@ -0,0 +1,264 @@ +# llama-router Architecture + +Technical documentation for developers and contributors. + +--- + +## Design Philosophy + +llama-router follows KISS (Keep It Simple, Stupid) principles: + +- **Minimal configuration**: Works out-of-box with HF cache scanning +- **Explicit persistence**: Config changes are written explicitly via admin endpoints, never hidden in business logic +- **Separation of concerns**: Core routing logic (`RouterApp`) has zero I/O, persistence handled by admin layer +- **Simple endpoint matching**: Prefix-based matching, no complex regex +- **Transparent proxy**: Headers and streaming forwarded as-is +- **On-demand by default**: Models start on first request; `startup_model` can preload a chosen backend at boot +- **Plug-and-play defaults**: Hugging Face downloads set `startup_model` automatically when it is unset +- **Transparent operations**: Optional real-time notifications for swap feedback via SSE + +### The auto + default_spawn Workflow + +Models discovered from the HuggingFace cache are marked as `auto` and inherit the `default_spawn` configuration. This creates a powerful optimization pattern: + +1. **Tune `default_spawn` once** with your preferred parameters (GPU layers, KV cache quantization, context size, etc.) +2. **All `auto` models automatically use these settings** - no per-model configuration needed +3. **Change `default_spawn` and reload** - all `auto` models instantly updated +4. **Customize individual models** by switching to `manual` state first to prevent rescan overwrites + +This ensures consistent, optimized behavior across your entire model collection while allowing per-model overrides when needed. **Always set models to `manual` before customizing their spawn parameters** - otherwise your changes will be lost on the next rescan. + +## Multi-Engine Support + +llama-router is engine-agnostic. Any OpenAI-compatible inference backend can be orchestrated by configuring the appropriate spawn command and endpoints. The router simply: + +1. Spawns the command specified in `spawn.command` +2. Polls `health_endpoint` until it returns HTTP 200 (customizable per backend) +3. Proxies requests matching `proxy_endpoints` to the running instance + +This design allows you to mix llama.cpp, vLLM, Ollama, Text Generation Inference, or any custom backend in a single router configuration. Set models to `manual` state when using non-llama.cpp backends to prevent automatic cache rescans from removing them. + +### Future: WebUI Administration (TODO) + +The admin API endpoints (`/admin/reload`, `/admin/rescan`) are designed to support hot configuration and model management. A future WebUI will enable: + +- **Live model downloads** from HuggingFace directly through the interface +- **Hot reconfiguration** of `default_spawn` and per-model settings without restart +- **Real-time monitoring** of running instances and resource usage +- **Interactive model management** (add, remove, customize spawn parameters) + +This aligns with the project philosophy: **everything configurable at runtime, zero downtime required**. The current CLI and JSON-based workflow is production-ready; the WebUI will provide a more accessible interface to the same underlying admin API. + +--- + +## Architecture + +### System Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ llama-router │ +│ (port 8082) │ +├─────────────────────────────────────────────────────────────┤ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Config │ │ Scanner │ │ Process Manager │ │ +│ │ Loader │ │ (HF cache) │ │ (spawn/terminate) │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────┘ │ +│ │ │ +│ ┌─────────────────────────┴────────────────────────────┐ │ +│ │ HTTP Proxy │ │ +│ │ (streaming support, header forwarding) │ │ +│ └──────────────────────────────────────────────────────┘ │ +└──────────────────────────┬──────────────────────────────────┘ + │ + ┌──────────────────┼──────────────────┐ + ▼ ▼ ▼ +┌───────────────┐ ┌───────────────┐ ┌───────────────┐ +│ llama-server │ │ llama-server │ │ llama-server │ +│ (port 50000) │ │ (port 50001) │ │ (port 50002) │ +│ Model A │ │ Model B │ │ Model C │ +└───────────────┘ └───────────────┘ └───────────────┘ +``` + +### Request Flow + +1. Client sends POST to `/v1/chat/completions` with `"model": "ModelA"` +2. Router checks if ModelA is already running +3. If not running, or if a conflicting group is active: + - Terminate conflicting backends + - Spawn new llama-server with assigned port + - Poll `/health` until ready (`ROUTER_BACKEND_READY_TIMEOUT_MS` timeout) +4. Forward request to backend, streaming response back to client +5. Backend remains running for subsequent requests + +### Process Lifecycle + +- **Spawn**: `fork()`/`CreateProcess()` with stdout/stderr capture +- **Health polling**: `ROUTER_BACKEND_HEALTH_POLL_MS` intervals, `ROUTER_BACKEND_READY_TIMEOUT_MS` timeout +- **Graceful shutdown**: SIGTERM → wait `ROUTER_PROCESS_SHUTDOWN_TIMEOUT_MS` → SIGKILL → poll every `ROUTER_PROCESS_POLL_INTERVAL_MS` until exit +- **Cleanup**: File descriptors closed, waitpid() called + +--- + +## File Structure & Separation of Concerns + +| Component | Files | Responsibility | +|-----------|-------|----------------| +| **Core** | `router-app.cpp/h` | Model lifecycle, spawn orchestration, group logic, progress notification emission (zero I/O except notifications) | +| **HTTP Endpoints** | `router-endpoints.cpp/h` | Public API routes (`/v1/models`, `/v1/chat/completions`) | +| **Admin** | `router-admin.cpp/h` | Admin routes with explicit config persistence | +| **Proxy** | `router-proxy.cpp/h` | HTTP forwarding, SSE streaming, header management | +| **Process** | `router-process.cpp/h` | Cross-platform subprocess spawning, I/O capture | +| **Config** | `router-config.cpp/h` | JSON load/write, rescan logic, `RescanResult` | +| **Scanner** | `router-scanner.cpp/h` | HF cache discovery, `--import-dir`, mmproj detection | +| **Main** | `router.cpp` | CLI parsing, server setup, signal handlers | +| **Utils** | `logging.cpp/h`, `router-constants.h` | Shared logging and constants | + +**Design principles enforced:** +- `router-app`: Pure business logic, no filesystem I/O +- `router-admin`: Owns config persistence, explicit writes only +- `router-proxy`: Streaming & forwarding, value-captured lambdas to avoid use-after-free +- `router-process`: Platform abstraction, child processes never call parent logging functions + +--- + +## Technical Notes + +### Cross-Platform Process Management + +The router handles subprocess spawning differently per platform: + +**Linux/macOS:** Uses `fork()` + `execvp()` with careful attention to post-fork behavior. Child processes **must not** call logging functions that access parent singletons - they write directly to `STDERR_FILENO` instead to avoid use-after-fork crashes. + +**Windows:** Uses `CreateProcess()` with separate process information structures and handle management. + +### SSE Streaming Implementation + +Server-Sent Events streaming required careful lifetime management to avoid use-after-free bugs: + +1. **Capture by value**: Lambda captures must copy request data (headers, path, body), not reference stack variables that become invalid after the handler returns +2. **Explicit termination**: Call `sink.done()` followed by `return false` to signal httplib to close the connection properly - without this, streams deliver tokens correctly but never terminate + +### PATH Binary Resolution + +Spawn commands support both absolute/relative paths and PATH-based binaries: + +- **Paths with separators**: `/usr/bin/llama-server`, `./llama-server`, `C:\llama\server.exe` - existence validated before spawn +- **PATH binaries**: `python`, `vllm`, `ollama`, `llama-server` - no validation, relies on shell PATH resolution + +The router only validates file existence for commands containing `/` or `\\` path separators, allowing seamless use of system-installed binaries. + +### Spawn Command Placeholders + +The router expands placeholders in spawn commands: +- `$path` → The model file path from `path` field +- `$port` → Dynamically assigned port (increments from `base_port`) +- `$host` → Always expands to `127.0.0.1` for security + +### Model-Scoped Route Stripping + +Routes like `//health` are router-side aliases for convenience. Before proxying to the backend, the router strips the model prefix: + +- User request: `GET /Qwen3-8B-Q4_K_M.gguf/health` +- Forwarded to backend: `GET /health` + +Backends remain unaware of model-scoped routing - they expose standard endpoints like `/health`, `/v1/chat/completions`, etc. + +### HTTP Header Management + +The router strips `Content-Length` and `Transfer-Encoding` headers before forwarding requests. This is standard reverse-proxy behavior to handle chunked requests/responses properly and avoid conflicts when the proxy re-chunks data. + +All other headers are forwarded transparently to preserve client context (authentication, user-agent, etc.). + +### Real-Time Swap Notifications + +The router implements an opt-in notification system for streaming swap progress to clients: + +**Architecture:** +- `NotificationSink`: Function-based callback system in `router-config.h` +- `RouterApp::set/clear_notification_sink()`: Attach/detach sink before/after operations +- Progress emitted at 3 lifecycle points in `ensure_running()`: + * After `terminate_process()` - unload notification + * After `spawn_process()` - load notification + * After `wait_for_backend_ready()` - ready notification + +**Implementation:** +The proxy layer owns the full request lifecycle. For streaming requests with `notify_model_swap=true`: +1. Attach sink that enqueues formatted SSE chunks into the stream state +2. Call `ensure_running()` - notifications flow directly into the SSE queue +3. Clear sink before forwarding to backend (prevents backend logs in stream) + +Messages use OpenAI-compatible `delta.reasoning_content` field, prefixed with `[llama-router]` to distinguish router operations from model reasoning. + +**Design rationale:** +- Sink pattern allows clean separation: RouterApp emits events, proxy consumes them +- Notifications sent synchronously during operations = accurate timing perception +- Thread-safe via separate `notification_mutex` to avoid deadlock with main mutex +- Zero overhead when disabled (sink check + early return) + +### Health Endpoint Purpose + +The `health_endpoint` configuration field serves **spawn readiness polling only** - the router uses it to detect when a backend has finished loading and is ready to serve requests. + +This is separate from user-facing health routes. Clients can still call `//health` or `/health` for their own monitoring needs. The backend must expose standard endpoints regardless of what `health_endpoint` is configured for polling. + +### Multimodal Projector Priority + +When importing collections with `--import-dir`, mmproj files are automatically detected with this search priority: + +1. `*-bf16.gguf` (selected first) +2. `*-f16.gguf` (selected if BF16 not found) +3. `*-f32.gguf` (selected if neither BF16 nor F16 found) + +All quantization variants of a model (Q4_K_M, Q5_K_M, Q6_K, etc.) found in the same directory share the same mmproj file. + +**For manual models:** mmproj auto-detection applies only during initial import. You can edit `spawn.command` to remove `--mmproj` if unwanted - your changes persist across restarts. Only `auto` models get their spawn configuration regenerated on rescan. + +### Manifest Robustness + +The HF cache scanner gracefully handles missing or corrupted manifest files: + +- If `~/.cache/llama.cpp/` doesn't exist, scanner returns empty mapping +- If individual manifest files are missing, they're silently skipped +- Models without manifest entries load successfully, just without mmproj auto-detection + +**Cache structure example:** +``` +~/.cache/llama.cpp/ +├── bartowski_Qwen2.5-1.5B-Instruct-GGUF_Qwen2.5-1.5B-Instruct-Q4_K_M.gguf +├── bartowski_Qwen2.5-1.5B-Instruct-GGUF_Qwen2.5-1.5B-Instruct-Q4_K_M.gguf.etag +├── manifest=bartowski=Qwen2.5-1.5B-Instruct-GGUF=latest.json +├── unsloth_Qwen3-VL-4B-Instruct-GGUF_Qwen3-VL-4B-Instruct-Q6_K.gguf +├── unsloth_Qwen3-VL-4B-Instruct-GGUF_Qwen3-VL-4B-Instruct-Q6_K.gguf.etag +├── unsloth_Qwen3-VL-4B-Instruct-GGUF_mmproj-F16.gguf +├── unsloth_Qwen3-VL-4B-Instruct-GGUF_mmproj-F16.gguf.etag +└── manifest=unsloth=Qwen3-VL-4B-Instruct-GGUF=Q6_K.json +``` + +Manifest files (`manifest=vendor=repo=quant.json`) contain metadata for mmproj auto-detection. The scanner uses underscore separators: `vendor_repo_filename.gguf`. + +This ensures the router remains operational even with incomplete cache metadata. + +--- + +## Signals and Shutdown + +The router handles graceful shutdown on: +- `SIGINT` (Ctrl+C) +- `SIGTERM` + +Shutdown sequence: +1. Stop accepting new connections +2. Terminate all managed llama-server processes +3. Wait for process cleanup +4. Exit + +--- + +## Contributing + +llama-router is part of the llama.cpp project. Contributions welcome via pull request. + +## License + +MIT License - See llama.cpp repository for details. diff --git a/tools/router/CMakeLists.txt b/tools/router/CMakeLists.txt new file mode 100644 index 000000000..79b0d7e2d --- /dev/null +++ b/tools/router/CMakeLists.txt @@ -0,0 +1,35 @@ +set(TARGET llama-router) + +set(TARGET_SRCS + router.cpp + router-app.cpp + router-config.cpp + router-admin.cpp + router-endpoints.cpp + logging.cpp + router-process.cpp + router-proxy.cpp + router-scanner.cpp +) + +set(PUBLIC_ASSETS + index.html.gz +) + +foreach(asset ${PUBLIC_ASSETS}) + set(input "${PROJECT_SOURCE_DIR}/tools/server/public/${asset}") + set(output "${CMAKE_CURRENT_BINARY_DIR}/${asset}.hpp") + list(APPEND TARGET_SRCS ${output}) + add_custom_command( + DEPENDS "${input}" + OUTPUT "${output}" + COMMAND "${CMAKE_COMMAND}" "-DINPUT=${input}" "-DOUTPUT=${output}" -P "${PROJECT_SOURCE_DIR}/scripts/xxd.cmake" + ) + set_source_files_properties(${output} PROPERTIES GENERATED TRUE) +endforeach() + +add_executable(${TARGET} ${TARGET_SRCS}) + +target_link_libraries(${TARGET} PRIVATE common cpp-httplib ${CMAKE_THREAD_LIBS_INIT}) + +target_include_directories(${TARGET} PRIVATE ${PROJECT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/tools/router/README.md b/tools/router/README.md new file mode 100644 index 000000000..d508bc7bd --- /dev/null +++ b/tools/router/README.md @@ -0,0 +1,564 @@ +# llama-router + +A lightweight, cross-platform model orchestrator for llama.cpp that dynamically spawns and manages llama-server instances on demand. + +## Overview + +llama-router acts as an intelligent proxy that sits in front of your model collection. When a request arrives for a specific model, the router automatically starts the appropriate llama-server backend, waits for it to become ready, and forwards the request transparently. This enables a single API endpoint to serve multiple models without keeping them all loaded in memory simultaneously. + +### Key Features + +- **On-demand model loading**: Models are spawned only when requested, conserving GPU memory +- **Automatic discovery**: Scans the Hugging Face cache directory for available models +- **Hugging Face integration**: Download models directly via CLI with `-hf` flag and automatically set the `startup_model` when it is not configured +- **Collection import**: Recursively import local GGUF directories +- **Multimodal support**: Automatically detects and configures mmproj files for vision models +- **Model grouping**: Define groups to ensure only one model from a group runs at a time +- **Optional warm start**: Preload a chosen model on startup via `startup_model` +- **OpenAI-compatible API**: Exposes `/v1/models` and `/v1/chat/completions` endpoints +- **Hot reload**: Admin endpoints for runtime configuration updates + +### Cross-Platform Support + +llama-router builds and runs on both Linux and Windows: + +| Platform | Process Management | Path Handling | +|----------|-------------------|---------------| +| Linux/macOS | `fork()` + `execvp()` | `$HOME`, `/proc/self/exe` | +| Windows | `CreateProcess()` | `%USERPROFILE%`, `GetModuleFileName()` | + +The router automatically detects the llama-server binary location relative to its own executable, falling back to PATH resolution if not found. + +--- + +## Quick Start + +### Building + +llama-router is built as part of the llama.cpp server tools: + +```bash +cmake -B build -DLLAMA_BUILD_SERVER=ON +cmake --build build --target llama-router +``` + +The binary will be located at `build/bin/llama-router`. + +### Running + +Simply launch the router: + +```bash +./llama-router +``` + +On first run, it will: +1. Create a default configuration at `~/.config/llama.cpp/router-config.json` +2. Scan the Hugging Face cache (`~/.cache/llama.cpp/`) for GGUF models +3. Add discovered models as `auto` state, inheriting `default_spawn` configuration +4. Start listening on `127.0.0.1:8082` +5. Optionally preload the configured `startup_model` if set + +On every subsequent startup: +- Automatic rescan updates the model list (adds new, removes deleted cache files) +- All `auto` models inherit the current `default_spawn` settings +- `manual` models preserve their custom configurations + +--- + +## CLI Reference + +### Download from Hugging Face + +The `-hf` flag provides plug-and-play model downloading directly from Hugging Face: + +```bash +# Download a model by repository (auto-selects best GGUF) +./llama-router -hf bartowski/Qwen3-8B-GGUF + +# Specify a quantization variant +./llama-router -hf bartowski/Qwen3-8B-GGUF:Q4_K_M + +# Or specify the exact filename +./llama-router -hf bartowski/Qwen3-8B-GGUF -hff Qwen3-8B-Q4_K_M.gguf +``` + +**CLI options for downloading:** + +| Flag | Description | +|------|-------------| +| `-hf`, `-hfr`, `--hf-repo` | Hugging Face repository (format: `user/repo` or `user/repo:quant`) | +| `-hff`, `--hf-file` | Specific GGUF filename within the repository | + +**Authentication:** Set the `HF_TOKEN` environment variable for private or gated repositories: + +```bash +export HF_TOKEN="hf_xxxxxxxxxxxxx" +./llama-router -hf meta-llama/Llama-3.1-8B-Instruct-GGUF +``` + +After download completes, the model is cached and will be automatically discovered on the next router startup. If your configuration does not yet specify a `startup_model`, the downloaded model is set as the startup model so it preloads automatically for plug-and-play use. + +### Import a Local Collection + +Use `--import-dir` to recursively scan a directory and add all discovered GGUF models to your configuration: + +```bash +# Import all models from a directory +./llama-router --import-dir ~/my-models/ + +# Import from multiple locations (run multiple times) +./llama-router --import-dir /mnt/nas/gguf-collection/ +./llama-router --import-dir ~/experiments/fine-tuned/ +``` + +The import process: +1. Recursively scans the directory for `.gguf` files +2. Excludes files containing "mmproj" in the filename (these are vision projectors) +3. Automatically pairs models with mmproj files found in the same directory +4. Adds new models with state `"manual"` to prevent accidental removal +5. Persists updated configuration to disk + +**Multimodal detection:** If a model like `llava-v1.6-mistral-7b-Q4_K_M.gguf` is found alongside `mmproj-model-f16.gguf` in the same directory, the import will automatically configure the spawn command with `--mmproj`. + +### Other CLI Options + +| Flag | Description | +|------|-------------| +| `-h`, `--help` | Display help message | +| `--config ` | Override config file location | + +--- + +## Model States: `auto` vs `manual` + +The router tracks each model's origin through a `state` field, which controls behavior during rescans: + +### Important: On-Demand Spawning + +Models spawn on demand when first requested via the API. Set `startup_model` to eagerly load a specific model at boot; all other models remain on-demand. The `auto`/`manual` state controls only rescan behavior: + +- `auto`: Managed by cache scanner, inherits `default_spawn` +- `manual`: Protected from rescans, can have custom `spawn` configuration + +### `auto` State + +Models discovered automatically from the Hugging Face cache are marked as `"state": "auto"`. These models: + +- Are added when first discovered in the cache +- Are **removed automatically** if the cached file disappears (e.g., cache cleanup) +- Are re-added if the file reappears +- **Inherit `default_spawn` configuration** - change `default_spawn` to optimize all `auto` models at once + +This enables seamless synchronization with `huggingface-cli` downloads and cache management. + +### `manual` State + +Models added via `--import-dir` or edited by hand in the config are marked as `"state": "manual"`. These models: + +- Are **never automatically removed**, even if the file path becomes invalid +- Must be manually deleted from the configuration +- Survive rescans and configuration reloads +- **Can have custom `spawn` configurations** that override `default_spawn` + +**Use cases for manual state:** +- Models on network storage that may be temporarily unavailable +- Fine-tuned models in development directories +- Models you want to persist regardless of file system changes + +--- + +## Configuration + +### File Location + +Default: `~/.config/llama.cpp/router-config.json` + +Override with `--config`: + +```bash +./llama-router --config /etc/llama-router/config.json +``` + +### Configuration Structure + +```json +{ + "version": "1.0", + "startup_model": "", + "router": { + "host": "127.0.0.1", + "port": 8082, + "base_port": 50000, + "connection_timeout_s": 5, + "read_timeout_s": 600, + "admin_token": "", + "notify_model_swap": false + }, + "default_spawn": { + "command": [ + "llama-server", + "-m", "$path", + "--port", "$port", + "--host", "$host", + "--jinja", + "--ctx-size", "4096", + "--n-gpu-layers", "99" + ], + "proxy_endpoints": ["/v1/", "/health", "/slots", "/props"], + "health_endpoint": "/health" + }, + "models": [ + { + "name": "Qwen3-8B-Q4_K_M.gguf", + "path": "/home/user/.cache/llama.cpp/bartowski_Qwen3-8B-GGUF_Qwen3-8B-Q4_K_M.gguf", + "state": "auto", + "group": "" + } + ] +} +``` + +### Router Options + +| Field | Default | Description | +|-------|---------|-------------| +| `host` | `127.0.0.1` | Address to bind the router | +| `port` | `8082` | Port for incoming requests | +| `base_port` | `50000` | Starting port for spawned backends (increments per model) | +| `connection_timeout_s` | `5` | Upstream connection timeout | +| `read_timeout_s` | `600` | Upstream read timeout (long for streaming) | +| `admin_token` | `""` | Bearer token for admin endpoints (empty = no auth) | +| `notify_model_swap` | `false` | Enable real-time SSE notifications during model swaps (shows unload/load/ready progress) | + +### Startup Model + +Use `startup_model` to eagerly launch a specific model when the router boots. The value must match a `name` entry under `models`; otherwise configuration loading fails. All other models remain on-demand and will start when first requested. + +### Default Spawn Configuration + +The `default_spawn` block defines how llama-server instances are launched: + +```json +{ + "command": [ + "llama-server", + "-m", "$path", + "--port", "$port", + "--host", "$host", + "--jinja", + "--ctx-size", "4096", + "--n-gpu-layers", "99" + ], + "proxy_endpoints": ["/v1/", "/health", "/slots", "/props"], + "health_endpoint": "/health" +} +``` + +### Spawn Command Placeholders + +The router supports placeholders in spawn commands for dynamic value injection: + +| Placeholder | Description | Example expansion | +|-------------|-------------|-------------------| +| `$path` | Model file path from configuration | `/home/user/.cache/llama.cpp/model.gguf` | +| `$port` | Dynamically assigned port | `50000`, `50001`, etc. | +| `$host` | Bind address (always `127.0.0.1`) | `127.0.0.1` | + +This makes all spawn parameters explicit and visible in the configuration. + +### Optimizing for Your Hardware + +The `default_spawn` is where you tune performance for your specific hardware. **All `auto` models inherit these settings**, so you can optimize once for your entire collection: + +```json +{ + "default_spawn": { + "command": [ + "llama-server", + "-m", "$path", + "--port", "$port", + "--host", "$host", + "-ngl", "999", + "-ctk", "q8_0", + "-ctv", "q8_0", + "-fa", "on", + "--mlock", + "-np", "4", + "-kvu", + "--jinja" + ], + "proxy_endpoints": ["/v1/", "/health", "/slots", "/props"], + "health_endpoint": "/health" + } +} +``` + +**Common optimizations:** +- `-ngl 999`: Offload all layers to GPU +- `-ctk q8_0 -ctv q8_0`: Quantize KV cache to Q8 for lower VRAM usage +- `-fa on`: Enable Flash Attention +- `--mlock`: Lock model in RAM to prevent swapping +- `-np 4`: Process 4 prompts in parallel +- `-kvu`: Use single unified KV buffer for all sequences (also `--kv-unified`) +- `--jinja`: Enable Jinja template support + +Change `default_spawn`, reload the router, and all `auto` models instantly use the new configuration. + +### Per-Model Spawn Override + +Individual models can override the default spawn configuration: + +```json +{ + "name": "llava-v1.6-mistral-7b-Q4_K_M.gguf", + "path": "/path/to/model.gguf", + "state": "manual", + "spawn": { + "command": [ + "llama-server", + "-m", "$path", + "--port", "$port", + "--host", "$host", + "--jinja", + "--ctx-size", "8192", + "--n-gpu-layers", "99", + "--mmproj", "/path/to/mmproj-model-f16.gguf" + ], + "proxy_endpoints": ["/v1/", "/health"], + "health_endpoint": "/health" + } +} +``` + +### Model Groups + +**Default Behavior: Single Model at a Time** + +llama-router is designed for resource-constrained environments (small GPUs, consumer hardware). By default, **only ONE model runs at a time** - when you request a different model, the current one is stopped first. This ensures reliable operation on systems with limited VRAM. + +To allow multiple models to run simultaneously, assign the **same group** to models that can coexist: + +```json +{ + "models": [ + { + "name": "qwen3-8b-q4", + "path": "/path/to/qwen3-8b-q4.gguf", + "group": "small-models" + }, + { + "name": "qwen3-8b-q8", + "path": "/path/to/qwen3-8b-q8.gguf", + "group": "small-models" + }, + { + "name": "llama-70b-q4", + "path": "/path/to/llama-70b-q4.gguf", + "group": "large-model" + } + ] +} +``` + +Behavior: +- Requesting `qwen3-8b-q4` while `qwen3-8b-q8` is running: **no restart** (same group) +- Requesting `llama-70b-q4` while `qwen3-8b-q4` is running: **stops qwen3, starts llama** (different group) + +**Omitting the `group` field creates an exclusive singleton per model** - each model stops all others before starting. + +--- + +## API Endpoints + +### OpenAI-Compatible + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/v1/models` | GET | List all configured models | +| `/v1/chat/completions` | POST | Chat completion (model selected from request body) | + +### Model-Specific Proxies + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `//health` | GET | Health check for specific model | +| `//props` | GET | Model properties | +| `//slots` | GET | Slot information | + +### Last-Spawned Shortcuts + +These endpoints proxy to the most recently spawned model: + +| Endpoint | Method | +|----------|--------| +| `/health` | GET | +| `/props` | GET | +| `/slots` | GET | + +### Admin Endpoints + +Protected by `admin_token` if configured: + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/admin/reload` | POST | Stop all running models | +| `/admin/rescan` | GET | Rescan cache and update config | + +**Authentication:** + +```bash +# Via Authorization header +curl -H "Authorization: Bearer " http://localhost:8082/admin/rescan + +# Via X-Admin-Token header +curl -H "X-Admin-Token: " http://localhost:8082/admin/rescan +``` + +--- + +## Real-Time Model Swap Notifications + +When `notify_model_swap` is enabled and streaming is active, the router sends progress updates via SSE `delta.reasoning_content` (OpenAI-compatible format) during model lifecycle operations: + +```json +{ + "router": { + "notify_model_swap": true + } +} +``` + +**Notification flow:** +1. `[llama-router] Unloading ModelA (groupX)` - sent during process termination +2. `[llama-router] Loading ModelB (groupY)` - sent during spawn + weight loading +3. `[llama-router] Backend ready, generating response...` - sent after health check + +Each message streams in real-time as the operation executes, providing transparent feedback on swap duration. + +**Example SSE output:** +``` +data: {"choices":[{"delta":{"reasoning_content":"[llama-router] Unloading Qwen3-8B (small-models)\n"},"index":0}]} + +data: {"choices":[{"delta":{"reasoning_content":"[llama-router] Loading Llama-70B (large-model)\n"},"index":0}]} + +data: {"choices":[{"delta":{"reasoning_content":"[llama-router] Backend ready, generating response...\n"},"index":0}]} + +data: {"choices":[{"delta":{"content":"Sure, I'd be happy to help!"},"index":0}]} +``` + +Non-streaming requests receive no swap notifications regardless of config. + +--- + +## Usage Examples + +### Basic Chat Completion + +```bash +curl http://localhost:8082/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "Qwen3-8B-Q4_K_M.gguf", + "messages": [{"role": "user", "content": "Hello!"}] + }' +``` + +### Streaming Response + +```bash +curl http://localhost:8082/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "Qwen3-8B-Q4_K_M.gguf", + "messages": [{"role": "user", "content": "Write a haiku about AI"}], + "stream": true + }' +``` + +### List Available Models + +```bash +curl http://localhost:8082/v1/models | jq '.data[].id' +``` + +### Check Model Health + +```bash +# Specific model +curl http://localhost:8082/Qwen3-8B-Q4_K_M.gguf/health + +# Last spawned model +curl http://localhost:8082/health +``` + +### Force Rescan After Downloads + +```bash +# Download a new model +./llama-router -hf TheBloke/Mistral-7B-v0.1-GGUF:Q4_K_M + +# Rescan without restarting +curl http://localhost:8082/admin/rescan +``` + +--- + +## Troubleshooting + +### Model Not Found + +``` +{"error":"model not found"} +``` + +Check that the model name in your request matches exactly the `name` field in the configuration. Use `/v1/models` to list available names. + +### Backend Not Ready + +``` +Backend for X did not become ready on port Y within 10000 ms +``` + +Possible causes: +- Model file corrupted or incompatible +- Insufficient GPU memory +- llama-server crashed during load + +Check the router's stdout for llama-server output. + +### Port Conflicts + +If `base_port` conflicts with other services, change it in the configuration: + +```json +{ + "router": { + "base_port": 60000 + } +} +``` + +### Permission Denied on Config + +Ensure the config directory exists and is writable: + +```bash +mkdir -p ~/.config/llama.cpp +``` + +--- + +## Documentation + +For detailed technical documentation, design decisions, and contributing guidelines, see [ARCHITECTURE.md](ARCHITECTURE.md). + +--- + +## Contributing + +llama-router is part of the llama.cpp project. Contributions welcome via pull request. + +## License + +MIT License - See llama.cpp repository for details. diff --git a/tools/router/logging.cpp b/tools/router/logging.cpp new file mode 100644 index 000000000..c4adca7a6 --- /dev/null +++ b/tools/router/logging.cpp @@ -0,0 +1,20 @@ +#include "logging.h" + +void router_log_init() { + common_log_set_verbosity_thold(LOG_DEFAULT_DEBUG); + + // Always reset to console-only logging for the router tool. This mirrors + // the llama-server defaults (stdout for info, stderr otherwise) and keeps + // formatting consistent with llama.cpp so router diagnostics and the core + // library share the same console sink. + auto * log = common_log_main(); + common_log_set_file(log, nullptr); + common_log_set_colors(log, LOG_COLORS_AUTO); + common_log_set_prefix(log, false); + common_log_set_timestamps(log, false); + + // Forward llama.cpp internals through the common logger so any backend + // message (tokenization, sampling, etc.) shows up alongside router + // messages without a second logging pipeline. + llama_log_set(common_log_default_callback, nullptr); +} diff --git a/tools/router/logging.h b/tools/router/logging.h new file mode 100644 index 000000000..ab8739323 --- /dev/null +++ b/tools/router/logging.h @@ -0,0 +1,12 @@ +#pragma once + +#include "log.h" +#include "llama.h" + +// Initialize the shared logger for the router tool. This routes llama.cpp +// internal logs through the common logger and ensures they only go to +// stdout/stderr (no log files are created). We mirror the llama-server +// defaults directly (console-only with auto colors, no prefix or timestamp), +// so no runtime JSON option is required. +void router_log_init(); + diff --git a/tools/router/router-admin.cpp b/tools/router/router-admin.cpp new file mode 100644 index 000000000..e4461877d --- /dev/null +++ b/tools/router/router-admin.cpp @@ -0,0 +1,61 @@ +#include "router-admin.h" + +#include "log.h" +#include "router-config.h" + +#include + +using json = nlohmann::json; + +static bool authorize_admin(const RouterConfig & cfg, const httplib::Request & req, httplib::Response & res) { + if (cfg.router.admin_token.empty()) { + return true; + } + + const std::string bearer = "Bearer " + cfg.router.admin_token; + const auto auth = req.get_header_value("Authorization"); + const auto token = req.get_header_value("X-Admin-Token"); + + if (auth == bearer || token == cfg.router.admin_token) { + return true; + } + + res.status = 403; + res.set_content("{\"error\":\"forbidden\"}", "application/json"); + LOG_WRN("Admin endpoint rejected unauthorized request from %s:%d\n", req.remote_addr.c_str(), req.remote_port); + return false; +} + +void register_admin_routes(httplib::Server & server, RouterApp & app, const std::string & config_path) { + server.Post("/admin/reload", [&app](const httplib::Request & req, httplib::Response & res) { + if (!authorize_admin(app.get_config(), req, res)) { + return; + } + LOG_INF("Reloading router application: stopping managed models\n"); + app.stop_all(); + res.set_content("{\"status\":\"reloaded\"}", "application/json"); + }); + + server.Get("/admin/rescan", [&app, config_path](const httplib::Request & req, httplib::Response & res) { + if (!authorize_admin(app.get_config(), req, res)) { + return; + } + + const auto rescan_result = rescan_auto_models(app.get_config()); + LOG_INF("Admin rescan requested, found %zu new models (removed %zu)\n", + rescan_result.added, + rescan_result.removed); + app.update_config(rescan_result.config); + + if (!config_path.empty() && (rescan_result.added > 0 || rescan_result.removed > 0)) { + LOG_INF("Persisting updated configuration to %s\n", config_path.c_str()); + write_config_file(app.get_config(), config_path); + } + + json out; + out["status"] = "rescanned"; + out["new_models"] = rescan_result.added; + out["removed"] = rescan_result.removed; + res.set_content(out.dump(), "application/json"); + }); +} diff --git a/tools/router/router-admin.h b/tools/router/router-admin.h new file mode 100644 index 000000000..2148f51ca --- /dev/null +++ b/tools/router/router-admin.h @@ -0,0 +1,7 @@ +#pragma once + +#include "router-app.h" + +#include + +void register_admin_routes(httplib::Server & server, RouterApp & app, const std::string & config_path = std::string()); diff --git a/tools/router/router-app.cpp b/tools/router/router-app.cpp new file mode 100644 index 000000000..2d924c83b --- /dev/null +++ b/tools/router/router-app.cpp @@ -0,0 +1,191 @@ +#include "router-app.h" + +#include "log.h" +#include "router-constants.h" +#include "router-config.h" +#include "router-process.h" + +#include +#include + +RouterApp::RouterApp(RouterConfig cfg) + : config(std::move(cfg)), next_port(config.router.base_port) { + for (const auto & model : config.models) { + model_lookup.emplace(model.name, model); + } +} + +RouterApp::~RouterApp() { stop_all(); } + +SpawnConfig RouterApp::resolve_spawn_config(const ModelConfig & cfg) const { + return is_spawn_empty(cfg.spawn) ? config.default_spawn : cfg.spawn; +} + +SpawnConfig RouterApp::get_spawn_config(const std::string & model_name) { + std::lock_guard lock(mutex); + auto it = model_lookup.find(model_name); + if (it == model_lookup.end()) { + return config.default_spawn; + } + return resolve_spawn_config(it->second); +} + +bool RouterApp::ensure_running(const std::string & model_name, std::string & error) { + std::lock_guard lock(mutex); + auto it_cfg = model_lookup.find(model_name); + if (it_cfg == model_lookup.end()) { + error = "model not found"; + return false; + } + + const ModelConfig & cfg = it_cfg->second; + const std::string target_group = get_model_group(cfg); + + for (auto it_proc = processes.begin(); it_proc != processes.end();) { + const auto it_model = model_lookup.find(it_proc->first); + const bool unknown = it_model == model_lookup.end(); + const std::string running_group = unknown ? std::string() : get_model_group(it_model->second); + + if (!unknown && running_group == target_group) { + ++it_proc; + continue; + } + + LOG_INF("Stopping %s (group '%s') to spawn %s (group '%s')\n", + it_proc->first.c_str(), + running_group.c_str(), + model_name.c_str(), + target_group.c_str()); + + terminate_process(it_proc->second); + notify_progress("[llama-router] Unloading " + it_proc->first + " (" + running_group + ")\n"); + wait_for_process_exit(it_proc->second, ROUTER_PROCESS_SHUTDOWN_TIMEOUT_MS); + model_ports.erase(it_proc->first); + it_proc = processes.erase(it_proc); + } + + auto it = processes.find(model_name); + if (it != processes.end() && process_running(it->second)) { + LOG_DBG("Model %s already running on port %d\n", model_name.c_str(), model_ports[model_name]); + return true; + } + + if (it != processes.end()) { + close_process(it->second); + processes.erase(it); + model_ports.erase(model_name); + } + + int port = next_port.fetch_add(1); + model_ports[model_name] = port; + + const SpawnConfig spawn_cfg = resolve_spawn_config(cfg); + + std::vector command = spawn_cfg.command; + const std::string model_path = expand_user_path(cfg.path); + + // Replace all placeholders + for (auto & arg : command) { + if (arg == "$path") { + arg = model_path; + } else if (arg == "$port") { + arg = std::to_string(port); + } else if (arg == "$host") { + arg = "127.0.0.1"; + } + } + + LOG_INF("Starting %s on port %d\n", model_name.c_str(), port); + + ProcessHandle handle = spawn_process(command); + if (!process_running(handle)) { + error = "failed to start process"; + LOG_ERR("Failed to start %s on port %d: %s\n", model_name.c_str(), port, error.c_str()); + terminate_process(handle); + return false; + } + + auto [proc_it, _] = processes.emplace(model_name, std::move(handle)); + last_spawned_model = model_name; + LOG_INF("Spawned %s (group '%s') with %zu args\n", model_name.c_str(), target_group.c_str(), command.size()); + + notify_progress("[llama-router] Loading " + model_name + " (" + target_group + ")\n"); + + const std::string health_endpoint = spawn_cfg.health_endpoint.empty() ? "/health" : spawn_cfg.health_endpoint; + if (!wait_for_backend_ready(port, health_endpoint, ROUTER_BACKEND_READY_TIMEOUT_MS, &proc_it->second)) { + error = "backend not ready"; + LOG_ERR("Backend for %s did not become ready on port %d within %d ms\n", + model_name.c_str(), + port, + ROUTER_BACKEND_READY_TIMEOUT_MS); + terminate_process(proc_it->second); + processes.erase(proc_it); + model_ports.erase(model_name); + return false; + } + + LOG_INF("Backend ready on port %d\n", port); + notify_progress("[llama-router] Backend ready, generating response...\n"); + return true; +} + +std::string RouterApp::upstream_for(const std::string & model_name) { + std::lock_guard lock(mutex); + auto it = model_ports.find(model_name); + if (it == model_ports.end()) { + return {}; + } + std::ostringstream os; + os << "http://127.0.0.1:" << it->second; + return os.str(); +} + +std::string RouterApp::get_last_spawned_model() { + std::lock_guard lock(mutex); + return last_spawned_model; +} + +void RouterApp::update_config(RouterConfig cfg) { + std::lock_guard lock(mutex); + config = std::move(cfg); + + model_lookup.clear(); + for (const auto & model : config.models) { + model_lookup.emplace(model.name, model); + } + + if (!model_lookup.count(last_spawned_model)) { + last_spawned_model.clear(); + } + + const int desired_base = config.router.base_port; + if (desired_base > next_port.load()) { + next_port.store(desired_base); + } +} + +void RouterApp::stop_all() { + std::lock_guard lock(mutex); + for (auto & kv : processes) { + LOG_INF("Stopping managed model %s\n", kv.first.c_str()); + terminate_process(kv.second); + } + processes.clear(); +} + +void RouterApp::set_notification_sink(NotificationSink sink) { + std::lock_guard lock(notification_mutex); + notification_sink = std::move(sink); +} + +void RouterApp::clear_notification_sink() { + std::lock_guard lock(notification_mutex); + notification_sink.reset(); +} + +void RouterApp::notify_progress(const std::string & message) { + std::lock_guard lock(notification_mutex); + if (notification_sink) { + (*notification_sink)(ProgressNotification{message}); + } +} diff --git a/tools/router/router-app.h b/tools/router/router-app.h new file mode 100644 index 000000000..66a73f6bb --- /dev/null +++ b/tools/router/router-app.h @@ -0,0 +1,43 @@ +#pragma once + +#include "router-config.h" +#include "router-process.h" + +#include +#include +#include +#include +#include +#include + +class RouterApp { +public: + explicit RouterApp(RouterConfig cfg); + ~RouterApp(); + + bool ensure_running(const std::string & model_name, std::string & error); + std::string upstream_for(const std::string & model_name); + std::string get_last_spawned_model(); + SpawnConfig get_spawn_config(const std::string & model_name); + void stop_all(); + void update_config(RouterConfig cfg); + + void set_notification_sink(NotificationSink sink); + void clear_notification_sink(); + + const RouterConfig & get_config() const { return config; } + +private: + RouterConfig config; + std::atomic next_port; + std::mutex mutex; + std::optional notification_sink; + std::mutex notification_mutex; + std::unordered_map model_lookup; + std::unordered_map processes; + std::unordered_map model_ports; + std::string last_spawned_model; + + SpawnConfig resolve_spawn_config(const ModelConfig & cfg) const; + void notify_progress(const std::string & message); +}; diff --git a/tools/router/router-config.cpp b/tools/router/router-config.cpp new file mode 100644 index 000000000..6bfbb6437 --- /dev/null +++ b/tools/router/router-config.cpp @@ -0,0 +1,395 @@ +#include "router-config.h" + +#include "common.h" +#include "log.h" +#include "router-scanner.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(_WIN32) +# define WIN32_LEAN_AND_MEAN +# ifndef NOMINMAX +# define NOMINMAX +# endif +# include +#elif defined(__APPLE__) +# include +#else +# include +#endif + +using json = nlohmann::json; + +static std::string detect_llama_server_binary() { +#if defined(_WIN32) + std::vector buffer(MAX_PATH); + DWORD len = 0; + while (true) { + len = GetModuleFileNameA(nullptr, buffer.data(), static_cast(buffer.size())); + if (len == 0) { + return std::string(); + } + if (len < buffer.size()) { + break; + } + buffer.resize(buffer.size() * 2); + } + + std::filesystem::path path(buffer.begin(), buffer.begin() + static_cast(len)); + return (path.parent_path() / "llama-server.exe").string(); +#elif defined(__linux__) + std::vector buffer(1024); + while (true) { + ssize_t len = readlink("/proc/self/exe", buffer.data(), buffer.size() - 1); + if (len < 0) { + return std::string(); + } + if (static_cast(len) < buffer.size() - 1) { + buffer[len] = '\0'; + break; + } + buffer.resize(buffer.size() * 2); + } + + std::filesystem::path path(buffer.data()); + return (path.parent_path() / "llama-server").string(); +#elif defined(__APPLE__) + std::vector buffer(PATH_MAX); + uint32_t size = static_cast(buffer.size()); + if (_NSGetExecutablePath(buffer.data(), &size) != 0) { + buffer.resize(size); + size = static_cast(buffer.size()); + if (_NSGetExecutablePath(buffer.data(), &size) != 0) { + return std::string(); + } + } + + std::filesystem::path path(buffer.data()); + return (path.parent_path() / "llama-server").string(); +#else + return std::string(); +#endif +} + +static SpawnConfig parse_spawn_config(const json & data) { + SpawnConfig spawn; + if (data.contains("command")) { + spawn.command = data["command"].get>(); + } + if (data.contains("proxy_endpoints")) { + spawn.proxy_endpoints = data["proxy_endpoints"].get>(); + } + if (data.contains("health_endpoint")) { + spawn.health_endpoint = data["health_endpoint"].get(); + } + + return spawn; +} + +static json serialize_spawn_config(const SpawnConfig & spawn) { + json obj; + obj["command"] = spawn.command; + obj["proxy_endpoints"] = spawn.proxy_endpoints; + obj["health_endpoint"] = spawn.health_endpoint; + return obj; +} + +const SpawnConfig & get_default_spawn() { + static const SpawnConfig spawn = [] { + SpawnConfig default_spawn = { + /*command =*/ {"llama-server", "-m", "$path", "--port", "$port", "--host", "$host", "--jinja", "--ctx-size", "4096", "--n-gpu-layers", "99"}, + /*proxy_endpoints =*/ {"/v1/", "/health", "/slots", "/props"}, + /*health_endpoint =*/ "/health", + }; + + std::error_code ec; + const std::string detected_path = detect_llama_server_binary(); + if (!detected_path.empty() && std::filesystem::exists(detected_path, ec) && !ec) { + LOG_INF("Detected llama-server at %s\n", detected_path.c_str()); + default_spawn.command[0] = detected_path; + } else { + LOG_INF("Falling back to llama-server resolved via PATH\n"); + } + + return default_spawn; + }(); + + return spawn; +} + +const RouterOptions & get_default_router_options() { + static const RouterOptions opts = { + /*host =*/ "127.0.0.1", + /*port =*/ 8082, + /*base_port =*/ 50000, + /*connection_timeout_s =*/ 5, + /*read_timeout_s =*/ 600, + /*admin_token =*/ "", + /*notify_model_swap =*/ false, + }; + + return opts; +} + +std::string get_default_config_path() { + const char * home = std::getenv("HOME"); +#if defined(_WIN32) + if (home == nullptr) { + home = std::getenv("USERPROFILE"); + } +#endif + std::string base = home ? std::string(home) : std::string(); + if (!base.empty() && base.back() != DIRECTORY_SEPARATOR) { + base.push_back(DIRECTORY_SEPARATOR); + } + return base + ".config" + DIRECTORY_SEPARATOR + "llama.cpp" + DIRECTORY_SEPARATOR + "router-config.json"; +} + +std::string expand_user_path(const std::string & path) { + if (path.size() >= 2 && path[0] == '~' && path[1] == '/') { + const char * home = std::getenv("HOME"); +#if defined(_WIN32) + if (home == nullptr) { + home = std::getenv("USERPROFILE"); + } +#endif + if (home != nullptr) { + return std::string(home) + path.substr(1); + } + } + return path; +} + +static void ensure_parent_directory(const std::string & path) { + std::filesystem::path p(path); + std::error_code ec; + auto parent = p.parent_path(); + if (!parent.empty() && !std::filesystem::exists(parent, ec)) { + std::filesystem::create_directories(parent, ec); + } +} + +void write_config_file(const RouterConfig & cfg, const std::string & path) { + json out; + out["version"] = cfg.version; + out["default_spawn"] = serialize_spawn_config(cfg.default_spawn); + out["router"] = {{"host", cfg.router.host}, + {"port", cfg.router.port}, + {"base_port", cfg.router.base_port}, + {"connection_timeout_s", cfg.router.connection_timeout_s}, + {"read_timeout_s", cfg.router.read_timeout_s}}; + + out["startup_model"] = cfg.startup_model; + + if (!cfg.router.admin_token.empty()) { + out["router"]["admin_token"] = cfg.router.admin_token; + } + + if (cfg.router.notify_model_swap) { + out["router"]["notify_model_swap"] = true; + } + + out["models"] = json::array(); + for (const auto & m : cfg.models) { + json obj; + obj["name"] = m.name; + obj["path"] = m.path; + obj["state"] = m.state.empty() ? "manual" : m.state; + if (!m.group.empty()) { + obj["group"] = m.group; + } + if (!is_spawn_empty(m.spawn)) { + obj["spawn"] = serialize_spawn_config(m.spawn); + } + out["models"].push_back(std::move(obj)); + } + + ensure_parent_directory(path); + + std::ofstream fout(path); + if (!fout) { + throw std::runtime_error("failed to write config file: " + path); + } + fout << out.dump(4) << std::endl; +} + +RouterConfig generate_default_config(const std::string & path) { + RouterConfig cfg; + cfg.version = "1.0"; + cfg.default_spawn = get_default_spawn(); + cfg.router = get_default_router_options(); + cfg.startup_model = ""; + cfg.models = scan_default_models(); + + LOG_INF("Discovered %zu default models while generating config\n", cfg.models.size()); + write_config_file(cfg, path); + LOG_INF("generated default config at %s\n", path.c_str()); + return cfg; +} + +RescanResult rescan_auto_models(const RouterConfig & existing) { + RescanResult result; + result.config = existing; + + RouterConfig & merged = result.config; + + std::unordered_map existing_paths; + for (size_t i = 0; i < existing.models.size(); ++i) { + existing_paths.emplace(expand_user_path(existing.models[i].path), i); + } + + auto scanned = scan_default_models(); + std::unordered_set scanned_paths; + for (auto & scanned_model : scanned) { + const auto expanded = expand_user_path(scanned_model.path); + scanned_paths.insert(expanded); + auto it = existing_paths.find(expanded); + if (it != existing_paths.end()) { + const auto & existing_model = existing.models[it->second]; + if (existing_model.state == "manual") { + continue; + } + + continue; + } + + if (scanned_model.state.empty()) { + scanned_model.state = "auto"; + } + merged.models.push_back(std::move(scanned_model)); + existing_paths.emplace(expanded, merged.models.size() - 1); + ++result.added; + } + + std::vector filtered; + filtered.reserve(merged.models.size()); + for (const auto & model : merged.models) { + if (model.state == "manual") { + filtered.push_back(model); + continue; + } + + const auto expanded = expand_user_path(model.path); + const auto found = scanned_paths.count(expanded) > 0; + if (found) { + filtered.push_back(model); + } else { + ++result.removed; + LOG_INF("Removing auto model (no longer in cache): %s\n", model.name.c_str()); + } + } + merged.models = std::move(filtered); + + return result; +} + +RouterConfig load_config(const std::string & path) { + RouterConfig cfg; + cfg.router = get_default_router_options(); + cfg.default_spawn = get_default_spawn(); + std::error_code ec; + if (!std::filesystem::exists(path, ec) || ec) { + LOG_WRN("Config file %s missing or inaccessible (ec=%d). Generating default.\n", path.c_str(), ec ? ec.value() : 0); + return generate_default_config(path); + } + + std::ifstream fin(path); + if (!fin) { + throw std::runtime_error("failed to open config file: " + path); + } + + json data = json::parse(fin); + LOG_INF("Loaded config file %s\n", path.c_str()); + if (data.contains("version")) { + cfg.version = data["version"].get(); + } + if (data.contains("default_spawn")) { + cfg.default_spawn = parse_spawn_config(data["default_spawn"]); + } + if (data.contains("router")) { + auto r = data["router"]; + if (r.contains("host")) cfg.router.host = r["host"].get(); + if (r.contains("port")) cfg.router.port = r["port"].get(); + if (r.contains("base_port")) cfg.router.base_port = r["base_port"].get(); + if (r.contains("connection_timeout_s")) cfg.router.connection_timeout_s = r["connection_timeout_s"].get(); + if (r.contains("read_timeout_s")) cfg.router.read_timeout_s = r["read_timeout_s"].get(); + if (r.contains("admin_token")) cfg.router.admin_token = r["admin_token"].get(); + if (r.contains("notify_model_swap")) cfg.router.notify_model_swap = r["notify_model_swap"].get(); + } + if (data.contains("startup_model")) { + cfg.startup_model = data["startup_model"].get(); + } + if (data.contains("models")) { + for (const auto & m : data["models"]) { + ModelConfig mc; + mc.name = m.value("name", ""); + mc.path = m.value("path", ""); + mc.state = m.value("state", "manual"); + mc.group = m.value("group", ""); + if (m.contains("spawn")) { + mc.spawn = parse_spawn_config(m["spawn"]); + } + cfg.models.push_back(std::move(mc)); + } + } + LOG_INF("Config parsed: %zu models, router port %d, base port %d\n", cfg.models.size(), cfg.router.port, cfg.router.base_port); + + const auto rescan_result = rescan_auto_models(cfg); + cfg = rescan_result.config; + LOG_INF("Rescanned models, found %zu new auto models (removed %zu)\n", rescan_result.added, rescan_result.removed); + + const auto validate_port = [&](int port, const std::string & name) { + if (port <= 0 || port > 65535) { + throw std::runtime_error("invalid " + name + " port in config: " + std::to_string(port)); + } + }; + + validate_port(cfg.router.port, "router"); + validate_port(cfg.router.base_port, "base"); + bool startup_found = cfg.startup_model.empty(); + + for (const auto & model : cfg.models) { + if (model.name.empty()) { + throw std::runtime_error("model entry missing name"); + } + + const std::string path_to_check = expand_user_path(model.path); + if (!std::filesystem::exists(path_to_check, ec)) { + throw std::runtime_error("model path does not exist: " + path_to_check); + } + + const SpawnConfig & spawn = is_spawn_empty(model.spawn) ? cfg.default_spawn : model.spawn; + if (spawn.command.empty()) { + throw std::runtime_error("spawn command missing for model: " + model.name); + } + + if (!startup_found && model.name == cfg.startup_model) { + startup_found = true; + } + } + + if (!startup_found) { + throw std::runtime_error("startup_model not found in configured models: " + cfg.startup_model); + } + + if (rescan_result.added > 0 || rescan_result.removed > 0) { + LOG_INF("Persisting updated configuration after rescan (added %zu, removed %zu)\n", rescan_result.added, rescan_result.removed); + write_config_file(cfg, path); + } + + return cfg; +} + +std::string get_model_group(const ModelConfig & cfg) { + return cfg.group.empty() ? cfg.name : cfg.group; +} diff --git a/tools/router/router-config.h b/tools/router/router-config.h new file mode 100644 index 000000000..35a530271 --- /dev/null +++ b/tools/router/router-config.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include + +struct ProgressNotification { + std::string message; +}; + +using NotificationSink = std::function; + +struct SpawnConfig { + std::vector command; + std::vector proxy_endpoints; + std::string health_endpoint; +}; + +inline bool is_spawn_empty(const SpawnConfig & spawn) { + return spawn.command.empty() && spawn.proxy_endpoints.empty() && spawn.health_endpoint.empty(); +} + +struct ModelConfig { + std::string name; + std::string path; + std::string state; + std::string group; + SpawnConfig spawn; +}; + +struct RouterOptions { + std::string host; + int port = 0; + int base_port = 0; + int connection_timeout_s = 5; + int read_timeout_s = 600; + std::string admin_token; + bool notify_model_swap = false; +}; + +struct RouterConfig { + std::string version; + SpawnConfig default_spawn; + RouterOptions router; + std::string startup_model; + std::vector models; +}; + +struct RescanResult { + RouterConfig config; + size_t added = 0; + size_t removed = 0; +}; + +std::string get_default_config_path(); +std::string expand_user_path(const std::string & path); +const SpawnConfig & get_default_spawn(); +const RouterOptions & get_default_router_options(); + +RouterConfig load_config(const std::string & path); +RouterConfig generate_default_config(const std::string & path); +void write_config_file(const RouterConfig & cfg, const std::string & path); +RescanResult rescan_auto_models(const RouterConfig & existing); + +std::string get_model_group(const ModelConfig & cfg); diff --git a/tools/router/router-constants.h b/tools/router/router-constants.h new file mode 100644 index 000000000..e1887e8a4 --- /dev/null +++ b/tools/router/router-constants.h @@ -0,0 +1,13 @@ +#pragma once + +// Polling interval in milliseconds used for backend health checks during startup. +#define ROUTER_BACKEND_HEALTH_POLL_MS 100 + +// Polling interval in milliseconds used for process exit checks and shutdown loops. +#define ROUTER_PROCESS_POLL_INTERVAL_MS 100 + +// Maximum time in milliseconds to wait for a process to terminate gracefully before forcing shutdown. +#define ROUTER_PROCESS_SHUTDOWN_TIMEOUT_MS 2000 + +// Maximum time in milliseconds to wait for a backend to report readiness. +#define ROUTER_BACKEND_READY_TIMEOUT_MS 60000 diff --git a/tools/router/router-endpoints.cpp b/tools/router/router-endpoints.cpp new file mode 100644 index 000000000..c147c54fa --- /dev/null +++ b/tools/router/router-endpoints.cpp @@ -0,0 +1,105 @@ +#include "router-endpoints.h" + +#include "log.h" +#include "router-app.h" +#include "router-proxy.h" + +#include "index.html.gz.hpp" + +#include + +#include +#include + +using json = nlohmann::json; + +static void handle_models(const RouterApp & app, httplib::Response & res) { + json out; + out["object"] = "list"; + out["data"] = json::array(); + auto now = static_cast(time(nullptr)); + for (const auto & model : app.get_config().models) { + out["data"].push_back({{"id", model.name}, {"object", "model"}, {"owned_by", "router"}, {"created", now}}); + } + LOG_INF("Listing %zu models\n", out["data"].size()); + res.set_content(out.dump(), "application/json"); +} + +static bool parse_model_from_chat(const httplib::Request & req, std::string & model) { + json body; + try { + body = json::parse(req.body); + } catch (const std::exception &) { + return false; + } + + model = body.value("model", std::string()); + return !model.empty(); +} + +void register_routes(httplib::Server & server, RouterApp & app) { + auto serve_index = [](const httplib::Request & req, httplib::Response & res) { + if (req.get_header_value("Accept-Encoding").find("gzip") == std::string::npos) { + res.set_content("Error: gzip is not supported by this browser", "text/plain"); + return; + } + + res.set_header("Content-Encoding", "gzip"); + res.set_header("Cross-Origin-Embedder-Policy", "require-corp"); + res.set_header("Cross-Origin-Opener-Policy", "same-origin"); + res.set_content(reinterpret_cast(index_html_gz), index_html_gz_len, "text/html; charset=utf-8"); + }; + + server.Get("/", serve_index); + server.Get("/index.html", serve_index); + + server.Get("/v1/models", [&app](const httplib::Request &, httplib::Response & res) { handle_models(app, res); }); + + auto proxy_last_spawned = [&app](const httplib::Request & req, httplib::Response & res) { + const std::string model = app.get_last_spawned_model(); + if (model.empty()) { + LOG_WRN("No last spawned model available for %s\n", req.path.c_str()); + res.status = 503; + res.set_content("no models running", "text/plain"); + return; + } + LOG_INF("Proxying %s to last spawned model %s\n", req.path.c_str(), model.c_str()); + const auto spawn_cfg = app.get_spawn_config(model); + proxy_request(req, res, app, model, spawn_cfg.proxy_endpoints); + }; + + server.Get("/props", proxy_last_spawned); + server.Get("/slots", proxy_last_spawned); + server.Get("/health", proxy_last_spawned); + + server.Get(R"(^/(.+)/(health|props|slots)$)", [&app](const httplib::Request & req, httplib::Response & res) { + auto model_it = req.matches.begin(); + ++model_it; + std::string model_name = model_it != req.matches.end() ? model_it->str() : std::string(); + ++model_it; + const std::string endpoint_suffix = model_it != req.matches.end() ? model_it->str() : std::string(); + LOG_INF("Proxying %s for model %s\n", req.path.c_str(), model_name.c_str()); + const auto spawn_cfg = app.get_spawn_config(model_name); + const std::string corrected_path = "/" + endpoint_suffix; + proxy_request(req, res, app, model_name, spawn_cfg.proxy_endpoints, corrected_path); + }); + + server.Post("/v1/chat/completions", [&app](const httplib::Request & req, httplib::Response & res) { + std::string model; + if (!parse_model_from_chat(req, model)) { + LOG_WRN("Chat completion request missing model field\n"); + res.status = 400; + res.set_content("{\"error\":\"invalid json or model missing\"}", "application/json"); + return; + } + + LOG_INF("Proxying chat completion for model %s\n", model.c_str()); + const auto spawn_cfg = app.get_spawn_config(model); + proxy_request(req, res, app, model, spawn_cfg.proxy_endpoints); + }); + + server.set_error_handler([](const httplib::Request &, httplib::Response & res) { + res.status = 404; + res.set_content("{\"error\":\"not found\"}", "application/json"); + }); +} diff --git a/tools/router/router-endpoints.h b/tools/router/router-endpoints.h new file mode 100644 index 000000000..c494a43bf --- /dev/null +++ b/tools/router/router-endpoints.h @@ -0,0 +1,7 @@ +#pragma once + +#include "router-app.h" + +#include + +void register_routes(httplib::Server & server, RouterApp & app); diff --git a/tools/router/router-process.cpp b/tools/router/router-process.cpp new file mode 100644 index 000000000..a6984733f --- /dev/null +++ b/tools/router/router-process.cpp @@ -0,0 +1,372 @@ +#include "router-constants.h" +#include "router-process.h" + +#include "log.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if !defined(_WIN32) +# include +# include +# include +#endif + +bool process_running(const ProcessHandle & handle) { +#if defined(_WIN32) + if (!handle.valid) { + return false; + } + DWORD status = WaitForSingleObject(handle.proc_info.hProcess, 0); + return status == WAIT_TIMEOUT; +#else + if (handle.pid <= 0) { + return false; + } + + int status = 0; + auto r = waitpid(handle.pid, &status, WNOHANG); + if (r == handle.pid) { + return false; + } + + return kill(handle.pid, 0) == 0; +#endif +} + +bool wait_for_process_exit(const ProcessHandle & handle, int timeout_ms) { +#if defined(_WIN32) + if (!handle.valid) { + return true; + } + return WaitForSingleObject(handle.proc_info.hProcess, timeout_ms) == WAIT_OBJECT_0; +#else + if (handle.pid <= 0) { + return true; + } + + const auto start = std::chrono::steady_clock::now(); + while (true) { + int status = 0; + auto r = waitpid(handle.pid, &status, WNOHANG); + if (r == handle.pid) { + return true; + } + auto elapsed = std::chrono::steady_clock::now() - start; + if (std::chrono::duration_cast(elapsed).count() > timeout_ms) { + return false; + } + std::this_thread::sleep_for(std::chrono::milliseconds(ROUTER_PROCESS_POLL_INTERVAL_MS)); + } +#endif +} + +void close_process(ProcessHandle & handle) { +#if defined(_WIN32) + if (!handle.valid) { + return; + } + if (handle.stdout_thread.joinable()) { + handle.stdout_thread.join(); + } + if (handle.stderr_thread.joinable()) { + handle.stderr_thread.join(); + } + if (handle.stdout_read != nullptr) { + CloseHandle(handle.stdout_read); + handle.stdout_read = nullptr; + } + if (handle.stderr_read != nullptr) { + CloseHandle(handle.stderr_read); + handle.stderr_read = nullptr; + } + WaitForSingleObject(handle.proc_info.hProcess, 0); + CloseHandle(handle.proc_info.hThread); + CloseHandle(handle.proc_info.hProcess); + handle.valid = false; +#else + if (handle.pid > 0) { + if (handle.stdout_thread.joinable()) { + handle.stdout_thread.join(); + } + if (handle.stderr_thread.joinable()) { + handle.stderr_thread.join(); + } + if (handle.stdout_fd != -1) { + close(handle.stdout_fd); + handle.stdout_fd = -1; + } + if (handle.stderr_fd != -1) { + close(handle.stderr_fd); + handle.stderr_fd = -1; + } + + LOG_DBG("Closing process pid=%d\n", static_cast(handle.pid)); + int status = 0; + waitpid(handle.pid, &status, WNOHANG); + handle.pid = -1; + } +#endif +} + +void terminate_process(ProcessHandle & handle) { +#if defined(_WIN32) + if (!handle.valid) { + return; + } + TerminateProcess(handle.proc_info.hProcess, 1); + close_process(handle); +#else + if (handle.pid <= 0) { + return; + } + LOG_WRN("Sending SIGTERM to pid=%d\n", static_cast(handle.pid)); + kill(handle.pid, SIGTERM); + if (!wait_for_process_exit(handle, ROUTER_PROCESS_SHUTDOWN_TIMEOUT_MS)) { + LOG_ERR("Process pid=%d did not terminate, sending SIGKILL\n", static_cast(handle.pid)); + kill(handle.pid, SIGKILL); + while (process_running(handle)) { + std::this_thread::sleep_for(std::chrono::milliseconds(ROUTER_PROCESS_POLL_INTERVAL_MS)); + } + } + close_process(handle); +#endif +} + +ProcessHandle spawn_process(const std::vector & args) { + ProcessHandle handle; + if (args.empty()) { + LOG_ERR("spawn_process called with empty args\n"); + return handle; + } + + const std::string binary = args[0]; + const bool has_path_separator = binary.find('/') != std::string::npos || binary.find('\\') != std::string::npos; + std::error_code ec; + if (has_path_separator && !std::filesystem::exists(binary, ec)) { + LOG_ERR("Binary not found: %s\n", binary.c_str()); + return handle; + } + +#if defined(_WIN32) + std::ostringstream cmdline; + for (size_t i = 0; i < args.size(); ++i) { + if (i > 0) { + cmdline << ' '; + } + const std::string & part = args[i]; + if (part.find(' ') != std::string::npos) { + cmdline << '"' << part << '"'; + } else { + cmdline << part; + } + } + + LOG_INF("Spawn command: %s\n", cmdline.str().c_str()); + + SECURITY_ATTRIBUTES sa{}; + sa.nLength = sizeof(sa); + sa.bInheritHandle = TRUE; + sa.lpSecurityDescriptor = nullptr; + + HANDLE stdout_read = nullptr, stdout_write = nullptr; + HANDLE stderr_read = nullptr, stderr_write = nullptr; + + if (!CreatePipe(&stdout_read, &stdout_write, &sa, 0) || !CreatePipe(&stderr_read, &stderr_write, &sa, 0)) { + LOG_ERR("pipe creation failed while spawning %s\n", args[0].c_str()); + if (stdout_read) CloseHandle(stdout_read); + if (stdout_write) CloseHandle(stdout_write); + if (stderr_read) CloseHandle(stderr_read); + if (stderr_write) CloseHandle(stderr_write); + return handle; + } + + SetHandleInformation(stdout_read, HANDLE_FLAG_INHERIT, 0); + SetHandleInformation(stderr_read, HANDLE_FLAG_INHERIT, 0); + + STARTUPINFOA si{}; + PROCESS_INFORMATION pi{}; + si.cb = sizeof(si); + si.dwFlags |= STARTF_USESTDHANDLES; + si.hStdOutput = stdout_write; + si.hStdError = stderr_write; + si.hStdInput = GetStdHandle(STD_INPUT_HANDLE); + + std::string cmd = cmdline.str(); + if (CreateProcessA(nullptr, cmd.data(), nullptr, nullptr, TRUE, 0, nullptr, nullptr, &si, &pi)) { + CloseHandle(stdout_write); + CloseHandle(stderr_write); + + handle.proc_info = pi; + handle.valid = true; + handle.stdout_read = stdout_read; + handle.stderr_read = stderr_read; + + handle.stdout_thread = std::thread([fd = handle.stdout_read]() { + HANDLE hStdout = GetStdHandle(STD_OUTPUT_HANDLE); + char buf[4096]; + DWORD read = 0, written = 0; + while (ReadFile(fd, buf, sizeof(buf), &read, nullptr) && read > 0) { + WriteFile(hStdout, buf, read, &written, nullptr); + } + }); + + handle.stderr_thread = std::thread([fd = handle.stderr_read]() { + HANDLE hStderr = GetStdHandle(STD_ERROR_HANDLE); + char buf[4096]; + DWORD read = 0, written = 0; + while (ReadFile(fd, buf, sizeof(buf), &read, nullptr) && read > 0) { + WriteFile(hStderr, buf, read, &written, nullptr); + } + }); + } else { + CloseHandle(stdout_read); + CloseHandle(stdout_write); + CloseHandle(stderr_read); + CloseHandle(stderr_write); + } + +#else + std::ostringstream cmd_str; + for (size_t i = 0; i < args.size(); ++i) { + if (i > 0) { + cmd_str << ' '; + } + const std::string & part = args[i]; + if (part.find(' ') != std::string::npos) { + cmd_str << '"' << part << '"'; + } else { + cmd_str << part; + } + } + + LOG_INF("Spawn command: %s\n", cmd_str.str().c_str()); + + int stdout_pipe[2] = {-1, -1}; + int stderr_pipe[2] = {-1, -1}; + + if (pipe(stdout_pipe) != 0 || pipe(stderr_pipe) != 0) { + LOG_ERR("pipe creation failed while spawning %s\n", args[0].c_str()); + if (stdout_pipe[0] != -1) { + close(stdout_pipe[0]); + } + if (stdout_pipe[1] != -1) { + close(stdout_pipe[1]); + } + if (stderr_pipe[0] != -1) { + close(stderr_pipe[0]); + } + if (stderr_pipe[1] != -1) { + close(stderr_pipe[1]); + } + return handle; + } + + pid_t pid = fork(); + if (pid == 0) { + close(stdout_pipe[0]); + close(stderr_pipe[0]); + dup2(stdout_pipe[1], STDOUT_FILENO); + dup2(stderr_pipe[1], STDERR_FILENO); + close(stdout_pipe[1]); + close(stderr_pipe[1]); + + std::vector cargs; + cargs.reserve(args.size() + 1); + for (const auto & arg : args) { + cargs.push_back(const_cast(arg.c_str())); + } + cargs.push_back(nullptr); + execvp(cargs[0], cargs.data()); + _exit(1); + } else if (pid > 0) { + close(stdout_pipe[1]); + close(stderr_pipe[1]); + + handle.pid = pid; + handle.stdout_fd = stdout_pipe[0]; + handle.stderr_fd = stderr_pipe[0]; + + handle.stdout_thread = std::thread([fd = handle.stdout_fd]() { + char buf[4096]; + ssize_t n; + while ((n = read(fd, buf, sizeof(buf))) > 0) { + write(STDOUT_FILENO, buf, n); + } + }); + + handle.stderr_thread = std::thread([fd = handle.stderr_fd]() { + char buf[4096]; + ssize_t n; + while ((n = read(fd, buf, sizeof(buf))) > 0) { + write(STDERR_FILENO, buf, n); + } + }); + + LOG_INF("Spawned child pid=%d\n", static_cast(pid)); + } else { + close(stdout_pipe[0]); + close(stdout_pipe[1]); + close(stderr_pipe[0]); + close(stderr_pipe[1]); + LOG_ERR("fork failed while spawning %s\n", args[0].c_str()); + } +#endif + + return handle; +} + +bool wait_for_backend_ready(int port, const std::string & health_endpoint, int timeout_ms, const ProcessHandle * process) { + httplib::Client client("127.0.0.1:" + std::to_string(port)); + const auto start = std::chrono::steady_clock::now(); + auto next_log_ms = 0; + + const std::string endpoint = health_endpoint.empty() ? "/health" : health_endpoint; + + LOG_INF("Waiting up to %d ms for backend readiness on port %d (endpoint %s)\n", + timeout_ms, + port, + endpoint.c_str()); + + while (true) { + try { + auto res = client.Get(endpoint.c_str()); + if (res && res->status == 200) { + LOG_INF("Backend on port %d reports ready\n", port); + return true; + } + LOG_DBG("Health check on port %d returned status %d\n", port, res ? res->status : -1); + } catch (const std::exception & e) { + LOG_DBG("Health check for port %d failed: %s\n", port, e.what()); + } + + auto elapsed_ms = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(); + if (elapsed_ms >= timeout_ms) { + break; + } + + if (process && !process_running(*process)) { + LOG_ERR("Backend process for port %d exited after %lld ms while waiting for readiness\n", + port, + static_cast(elapsed_ms)); + return false; + } + + if (elapsed_ms >= next_log_ms) { + LOG_INF("Still waiting for backend on port %d (elapsed %lld ms)\n", port, static_cast(elapsed_ms)); + next_log_ms += 1000; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(ROUTER_BACKEND_HEALTH_POLL_MS)); + } + + return false; +} diff --git a/tools/router/router-process.h b/tools/router/router-process.h new file mode 100644 index 000000000..981c1498a --- /dev/null +++ b/tools/router/router-process.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include + +#if defined(_WIN32) +# define WIN32_LEAN_AND_MEAN +# ifndef NOMINMAX +# define NOMINMAX +# endif +# include +#else +# include +#endif + +struct ProcessHandle { +#if defined(_WIN32) + PROCESS_INFORMATION proc_info{}; + bool valid = false; + HANDLE stdout_read = nullptr; + HANDLE stderr_read = nullptr; + std::thread stdout_thread; + std::thread stderr_thread; +#else + pid_t pid = -1; + int stdout_fd = -1; + int stderr_fd = -1; + std::thread stdout_thread; + std::thread stderr_thread; +#endif + + ProcessHandle() = default; + ProcessHandle(const ProcessHandle &) = delete; + ProcessHandle & operator=(const ProcessHandle &) = delete; + ProcessHandle(ProcessHandle &&) = default; + ProcessHandle & operator=(ProcessHandle &&) = default; +}; + +bool process_running(const ProcessHandle & handle); +void close_process(ProcessHandle & handle); +void terminate_process(ProcessHandle & handle); +bool wait_for_process_exit(const ProcessHandle & handle, int timeout_ms); +ProcessHandle spawn_process(const std::vector & args); +bool wait_for_backend_ready(int port, + const std::string & health_endpoint, + int timeout_ms, + const ProcessHandle * process = nullptr); diff --git a/tools/router/router-proxy.cpp b/tools/router/router-proxy.cpp new file mode 100644 index 000000000..ab9682da8 --- /dev/null +++ b/tools/router/router-proxy.cpp @@ -0,0 +1,294 @@ +#include "router-proxy.h" + +#include "log.h" +#include "router-app.h" +#include "router-config.h" + +#include + +#include +#include +#include +#include +#include +#include + +namespace { +void copy_response_headers(const httplib::Headers & from, httplib::Response & to) { + for (const auto & h : from) { + if (h.first == "Transfer-Encoding" || h.first == "Content-Length") { + continue; + } + to.set_header(h.first, h.second); + } +} + +bool matches_any_endpoint(const std::string & path, const std::vector & patterns) { + if (patterns.empty()) { + return true; + } + + for (const auto & pattern : patterns) { + if (path.compare(0, pattern.size(), pattern) == 0) { + return true; + } + } + + return false; +} + +std::string format_reasoning_chunk(const std::string & message) { + nlohmann::json chunk = { + {"choices", + {{{"delta", {{"reasoning_content", message}}}, {"index", 0}}}}, + }; + + return "data: " + chunk.dump() + "\n\n"; +} + +} // namespace + +bool proxy_request(const httplib::Request & req, + httplib::Response & res, + RouterApp & app, + const std::string & model_name, + const std::vector & proxy_endpoints, + const std::string & override_path) { + const RouterOptions & opts = app.get_config().router; + const std::string forwarded_path = !override_path.empty() ? override_path : (!req.target.empty() ? req.target : req.path); + + if (!matches_any_endpoint(forwarded_path, proxy_endpoints)) { + LOG_WRN("Request %s not proxied because it does not match configured endpoints\n", forwarded_path.c_str()); + res.status = 404; + res.set_content("{\"error\":\"endpoint not proxied\"}", "application/json"); + return false; + } + + const std::string content_type = req.get_header_value("Content-Type", "application/json"); + httplib::Headers headers = req.headers; + headers.erase("Host"); + + const auto accept_header = req.get_header_value("Accept"); + bool wants_stream = accept_header.find("text/event-stream") != std::string::npos; + + try { + auto body_json = nlohmann::json::parse(req.body); + if (body_json.value("stream", false)) { + wants_stream = true; + } + } catch (const std::exception &) { + if (req.body.find("\"stream\":true") != std::string::npos) { + wants_stream = true; + } + } + + const std::string method = req.method; + const std::string request_body = req.body; + + if (wants_stream) { + struct StreamState { + std::mutex mutex; + std::condition_variable cv; + std::queue chunks; + bool done = false; + bool headers_dispatched = false; + std::string content_type = "text/event-stream"; + int status = 200; + std::string reason = "OK"; + httplib::Headers upstream_headers; + }; + + auto state_ptr = std::make_shared(); + + auto content_receiver = [state_ptr](const char * data, size_t len) { + { + std::lock_guard lock(state_ptr->mutex); + state_ptr->chunks.emplace(data, len); + } + state_ptr->cv.notify_one(); + return true; + }; + + auto upstream_thread = std::make_shared([state_ptr, + &app, + model_name, + forwarded_path, + headers, + method, + request_body, + content_type, + opts, + content_receiver]() { + const bool sink_enabled = opts.notify_model_swap; + if (sink_enabled) { + app.set_notification_sink([state_ptr](const ProgressNotification & notif) { + const auto chunk = format_reasoning_chunk(notif.message); + { + std::lock_guard lock(state_ptr->mutex); + state_ptr->chunks.push(chunk); + } + state_ptr->cv.notify_one(); + }); + } + + auto clear_sink = [&app, sink_enabled]() { + if (sink_enabled) { + app.clear_notification_sink(); + } + }; + + std::string error; + if (!app.ensure_running(model_name, error)) { + clear_sink(); + { + std::lock_guard lock(state_ptr->mutex); + state_ptr->status = 404; + state_ptr->reason = "Not Found"; + state_ptr->content_type = "application/json"; + state_ptr->chunks.emplace("{\"error\":\"" + error + "\"}"); + } + state_ptr->done = true; + state_ptr->cv.notify_all(); + return; + } + + clear_sink(); + + const std::string upstream_base = app.upstream_for(model_name); + if (upstream_base.empty()) { + { + std::lock_guard lock(state_ptr->mutex); + state_ptr->status = 502; + state_ptr->reason = "Bad Gateway"; + state_ptr->content_type = "application/json"; + state_ptr->chunks.emplace("{\"error\":\"missing upstream\"}"); + } + state_ptr->done = true; + state_ptr->cv.notify_all(); + return; + } + + LOG_INF("Proxying %s %s to upstream %s\n", method.c_str(), forwarded_path.c_str(), upstream_base.c_str()); + auto local_client = std::make_shared(upstream_base.c_str()); + local_client->set_connection_timeout(opts.connection_timeout_s, 0); + local_client->set_read_timeout(opts.read_timeout_s, 0); + + httplib::Result result; + if (method == "POST") { + result = local_client->Post(forwarded_path.c_str(), headers, request_body, content_type.c_str(), content_receiver); + if (result) { + std::lock_guard lock(state_ptr->mutex); + state_ptr->status = result->status; + state_ptr->reason = result->reason; + state_ptr->upstream_headers = result->headers; + state_ptr->content_type = result->get_header_value("Content-Type", "text/event-stream"); + } + } else { + auto response_handler = [state_ptr](const httplib::Response & upstream) { + std::lock_guard lock(state_ptr->mutex); + state_ptr->status = upstream.status; + state_ptr->reason = upstream.reason; + state_ptr->upstream_headers = upstream.headers; + state_ptr->content_type = upstream.get_header_value("Content-Type", "text/event-stream"); + return true; + }; + result = local_client->Get(forwarded_path.c_str(), headers, response_handler, content_receiver); + } + + std::lock_guard lock(state_ptr->mutex); + if (!result) { + state_ptr->status = 502; + state_ptr->reason = "Bad Gateway"; + state_ptr->content_type = "application/json"; + state_ptr->chunks.emplace("{\"error\":\"upstream unavailable\"}"); + } + state_ptr->done = true; + state_ptr->cv.notify_all(); + }); + + res.status = 200; + res.set_chunked_content_provider( + "text/event-stream", + [state_ptr, upstream_thread, &res](size_t, httplib::DataSink & sink) { + std::unique_lock lock(state_ptr->mutex); + state_ptr->cv.wait(lock, [&] { return !state_ptr->chunks.empty() || state_ptr->done; }); + + if (!state_ptr->chunks.empty()) { + auto chunk = std::move(state_ptr->chunks.front()); + state_ptr->chunks.pop(); + + if (!state_ptr->headers_dispatched) { + res.status = state_ptr->status; + res.reason = state_ptr->reason; + copy_response_headers(state_ptr->upstream_headers, res); + res.set_header("Content-Type", state_ptr->content_type); + state_ptr->headers_dispatched = true; + } + + lock.unlock(); + return sink.write(chunk.data(), chunk.size()); + } + + if (state_ptr->done) { + lock.unlock(); + sink.done(); + return false; + } + + lock.unlock(); + return false; + }, + [state_ptr, upstream_thread](bool) { + (void) state_ptr; + if (upstream_thread && upstream_thread->joinable()) { + upstream_thread->join(); + } + }); + + return true; + } + + std::string error; + if (!app.ensure_running(model_name, error)) { + res.status = 404; + res.set_content("{\"error\":\"" + error + "\"}", "application/json"); + return false; + } + + const std::string upstream_base = app.upstream_for(model_name); + if (upstream_base.empty()) { + res.status = 502; + res.set_content("{\"error\":\"missing upstream\"}", "application/json"); + return false; + } + + LOG_INF("Proxying %s %s to upstream %s\n", req.method.c_str(), forwarded_path.c_str(), upstream_base.c_str()); + auto client = std::make_shared(upstream_base.c_str()); + client->set_connection_timeout(opts.connection_timeout_s, 0); + client->set_read_timeout(opts.read_timeout_s, 0); + + httplib::Result result; + if (method == "POST") { + result = client->Post(forwarded_path.c_str(), headers, request_body, content_type.c_str()); + } else { + result = client->Get(forwarded_path.c_str(), headers); + } + + if (!result) { + LOG_ERR("Upstream %s unavailable for %s %s\n", upstream_base.c_str(), method.c_str(), forwarded_path.c_str()); + res.status = 502; + res.set_content("{\"error\":\"upstream unavailable\"}", "application/json"); + return false; + } + + res.status = result->status; + res.reason = result->reason; + for (const auto & h : result->headers) { + res.set_header(h.first, h.second); + } + + const auto ct = result->get_header_value("Content-Type", "application/octet-stream"); + res.set_content(result->body, ct.c_str()); + LOG_INF("Upstream response %d (%s) relayed for %s\n", res.status, res.reason.c_str(), forwarded_path.c_str()); + return true; +} diff --git a/tools/router/router-proxy.h b/tools/router/router-proxy.h new file mode 100644 index 000000000..e72a2f4f4 --- /dev/null +++ b/tools/router/router-proxy.h @@ -0,0 +1,15 @@ +#pragma once + +#include "router-app.h" + +#include + +#include +#include + +bool proxy_request(const httplib::Request & req, + httplib::Response & res, + RouterApp & app, + const std::string & model_name, + const std::vector & proxy_endpoints, + const std::string & override_path = {}); diff --git a/tools/router/router-scanner.cpp b/tools/router/router-scanner.cpp new file mode 100644 index 000000000..48f40703f --- /dev/null +++ b/tools/router/router-scanner.cpp @@ -0,0 +1,231 @@ +#include "router-scanner.h" + +#include "log.h" +#include "router-config.h" + +#include "common.h" + +#include +#include +#include +#include +#include +#include +#include + +using json = nlohmann::json; + +static std::string sanitize_repo_filename(const std::string & repo, const std::string & filename) { + std::string name = repo + "_" + filename; + std::replace(name.begin(), name.end(), '/', '_'); + return name; +} + +static std::unordered_map load_mmproj_map(const std::string & cache_dir) { + std::unordered_map mapping; + + std::error_code ec; + for (std::filesystem::directory_iterator it(cache_dir, ec), end; it != end && !ec; ++it) { + if (!it->is_regular_file()) { + continue; + } + + const std::string name = it->path().filename().string(); + constexpr std::string_view prefix = "manifest="; + constexpr std::string_view suffix = ".json"; + if (name.rfind(prefix, 0) != 0 || + name.size() <= prefix.size() + suffix.size() || + name.substr(name.size() - suffix.size()) != suffix) { + continue; + } + + const std::string without_ext = it->path().stem().string(); + std::string encoded = without_ext.substr(prefix.size()); + + std::vector parts; + size_t pos = 0; + while (pos <= encoded.size()) { + size_t next = encoded.find('=', pos); + if (next == std::string::npos) { + parts.push_back(encoded.substr(pos)); + break; + } + parts.push_back(encoded.substr(pos, next - pos)); + pos = next + 1; + } + + if (parts.size() < 3) { + continue; + } + + const std::string repo = parts[0] + "/" + parts[1]; + + json manifest; + try { + std::ifstream fin(it->path()); + if (!fin) { + continue; + } + manifest = json::parse(fin); + } catch (const std::exception &) { + continue; + } + + auto extract_rfilename = [](const json & obj, const char * key) -> std::string { + if (obj.contains(key) && obj[key].contains("rfilename")) { + return obj[key]["rfilename"].get(); + } + return {}; + }; + + const std::string gguf_file = extract_rfilename(manifest, "ggufFile"); + const std::string mmproj_file = extract_rfilename(manifest, "mmprojFile"); + + if (gguf_file.empty() || mmproj_file.empty()) { + continue; + } + + const std::string gguf_path = fs_get_cache_file(sanitize_repo_filename(repo, gguf_file)); + const std::string mmproj_path = fs_get_cache_file(sanitize_repo_filename(repo, mmproj_file)); + + if (!std::filesystem::exists(gguf_path, ec) || !std::filesystem::exists(mmproj_path, ec)) { + continue; + } + + mapping.emplace(gguf_path, mmproj_path); + } + + return mapping; +} + +std::vector scan_default_models() { + std::vector models; + + std::string cache_dir = fs_get_cache_directory(); + + std::error_code ec; + if (!std::filesystem::exists(cache_dir, ec) || ec) { + return models; + } + + const auto mmproj_map = load_mmproj_map(cache_dir); + std::unordered_set projector_paths; + for (const auto & entry : mmproj_map) { + projector_paths.insert(entry.second); + } + + std::unordered_set seen; + + for (std::filesystem::recursive_directory_iterator it(cache_dir, ec), end; it != end && !ec; ++it) { + if (!it->is_regular_file()) { + continue; + } + std::string ext = it->path().extension().string(); + std::transform(ext.begin(), ext.end(), ext.begin(), [](unsigned char c) { return static_cast(std::tolower(c)); }); + if (ext != ".gguf") { + continue; + } + + std::string full_path = it->path().string(); + if (seen.count(full_path)) { + continue; + } + seen.insert(full_path); + + if (projector_paths.count(full_path)) { + continue; + } + + ModelConfig mc; + mc.name = it->path().filename().string(); + mc.path = full_path; + mc.state = "auto"; + if (auto it_mmproj = mmproj_map.find(full_path); it_mmproj != mmproj_map.end()) { + mc.spawn = get_default_spawn(); + mc.spawn.command.push_back("--mmproj"); + mc.spawn.command.push_back(it_mmproj->second); + } + + models.push_back(std::move(mc)); + } + + LOG_INF("Model scanner found %zu candidates in %s\n", models.size(), cache_dir.c_str()); + return models; +} + +static std::string find_mmproj_in_dir(const std::filesystem::path & dir) { + static const std::vector priorities = {"bf16.gguf", "f16.gguf", "f32.gguf"}; + + std::error_code ec; + for (const auto & priority : priorities) { + for (std::filesystem::directory_iterator it(dir, ec), end; it != end && !ec; ++it) { + if (!it->is_regular_file()) { + continue; + } + + std::string filename = it->path().filename().string(); + std::transform(filename.begin(), filename.end(), filename.begin(), [](unsigned char c) { return static_cast(std::tolower(c)); }); + + if (filename.find("mmproj") == std::string::npos) { + continue; + } + + if (filename.size() < priority.size() || filename.rfind(priority) != filename.size() - priority.size()) { + continue; + } + + return it->path().string(); + } + } + + return {}; +} + +std::vector scan_custom_dir(const std::string & path, const std::string & state) { + std::vector models; + + std::error_code ec; + if (!std::filesystem::exists(path, ec) || !std::filesystem::is_directory(path, ec) || ec) { + return models; + } + + std::unordered_set seen; + + for (std::filesystem::recursive_directory_iterator it(path, ec), end; it != end && !ec; ++it) { + if (!it->is_regular_file()) { + continue; + } + std::string ext = it->path().extension().string(); + std::transform(ext.begin(), ext.end(), ext.begin(), [](unsigned char c) { return static_cast(std::tolower(c)); }); + if (ext != ".gguf") { + continue; + } + + std::string full_path = it->path().string(); + if (seen.count(full_path)) { + continue; + } + seen.insert(full_path); + + std::string filename = it->path().filename().string(); + std::transform(filename.begin(), filename.end(), filename.begin(), [](unsigned char c) { return static_cast(std::tolower(c)); }); + + if (filename.find("mmproj") != std::string::npos) { + continue; + } + + ModelConfig mc; + mc.name = it->path().filename().string(); + mc.path = full_path; + mc.state = state; + if (auto mmproj_path = find_mmproj_in_dir(it->path().parent_path()); !mmproj_path.empty()) { + mc.spawn = get_default_spawn(); + mc.spawn.command.push_back("--mmproj"); + mc.spawn.command.push_back(mmproj_path); + } + + models.push_back(std::move(mc)); + } + + return models; +} diff --git a/tools/router/router-scanner.h b/tools/router/router-scanner.h new file mode 100644 index 000000000..ae3aca2b4 --- /dev/null +++ b/tools/router/router-scanner.h @@ -0,0 +1,9 @@ +#pragma once + +#include "router-config.h" + +#include +#include + +std::vector scan_default_models(); +std::vector scan_custom_dir(const std::string & path, const std::string & state); diff --git a/tools/router/router.cpp b/tools/router/router.cpp new file mode 100644 index 000000000..1499b2820 --- /dev/null +++ b/tools/router/router.cpp @@ -0,0 +1,268 @@ +#include "common.h" +#include "download.h" +#include "log.h" +#include "logging.h" +#include "router-app.h" +#include "router-config.h" +#include "router-constants.h" +#include "router-scanner.h" +#include "router-admin.h" +#include "router-endpoints.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_shutdown{false}; +static httplib::Server * g_server = nullptr; + +static void signal_handler(int) { + g_shutdown = true; + if (g_server) { + g_server->stop(); + } +} + +struct CliOptions { + bool show_help = false; + std::string hf_repo; + std::string hf_file; + std::string config_path; + std::string import_dir; +}; + +static bool parse_cli(int argc, char ** argv, CliOptions & out) { + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if (arg == "-h" || arg == "--help") { + out.show_help = true; + } else if (arg == "-hf" || arg == "-hfr" || arg == "--hf-repo") { + if (i + 1 >= argc) { + fprintf(stderr, "error: missing value for %s\n", arg.c_str()); + return false; + } + out.hf_repo = argv[++i]; + } else if (arg == "-hff" || arg == "--hf-file") { + if (i + 1 >= argc) { + fprintf(stderr, "error: missing value for %s\n", arg.c_str()); + return false; + } + out.hf_file = argv[++i]; + } else if (arg == "--config") { + if (i + 1 >= argc) { + fprintf(stderr, "error: missing value for --config\n"); + return false; + } + out.config_path = argv[++i]; + } else if (arg == "--import-dir") { + if (i + 1 >= argc) { + fprintf(stderr, "error: missing value for --import-dir\n"); + return false; + } + out.import_dir = argv[++i]; + } else { + fprintf(stderr, "warning: unknown argument %s\n", arg.c_str()); + } + } + return true; +} + +static void print_help() { + printf("usage: llama-router [options]\n\n"); + printf("Options:\n"); + printf(" -h, --help Show this help message\n"); + printf(" --config Override config path (default: ~/.config/llama.cpp/router-config.json)\n"); + printf(" -hf, -hfr, --hf-repo Hugging Face repository to download (format /[:quant])\n"); + printf(" -hff, --hf-file Specific GGUF filename to fetch from repository\n"); + printf(" --import-dir Recursively import GGUF models from directory\n"); +} + +static bool handle_download(const CliOptions & opts, const std::string & config_path) { + if (opts.hf_repo.empty()) { + return false; + } + + const char * hf_token = std::getenv("HF_TOKEN"); + std::string token = hf_token ? std::string(hf_token) : std::string(); + + try { + auto resolved = common_get_hf_file(opts.hf_repo, token, false); + std::string repo = resolved.repo; + std::string file = !opts.hf_file.empty() ? opts.hf_file : resolved.ggufFile; + if (file.empty()) { + fprintf(stderr, "error: unable to find GGUF file in repo %s\n", repo.c_str()); + return true; + } + + std::string url = get_model_endpoint() + repo + "/resolve/main/" + file; + std::string filename = repo + "_" + file; + string_replace_all(filename, "/", "_"); + std::string local_path = fs_get_cache_file(filename); + + common_params_model model; + model.hf_repo = repo; + model.hf_file = file; + model.url = url; + model.path = local_path; + + LOG_INF("Downloading %s to %s\n", url.c_str(), local_path.c_str()); + if (!common_download_model(model, token, false)) { + fprintf(stderr, "download failed\n"); + return true; + } + + try { + auto cfg = load_config(config_path); + + auto rescan_result = rescan_auto_models(cfg); + cfg = rescan_result.config; + + if (cfg.startup_model.empty()) { + cfg.startup_model = filename; + write_config_file(cfg, config_path); + LOG_INF("Configured startup_model to '%s' after download\n", filename.c_str()); + } else { + LOG_INF("startup_model already set to '%s', leaving unchanged\n", cfg.startup_model.c_str()); + } + } catch (const std::exception & e) { + LOG_WRN("Failed to set startup_model after download: %s\n", e.what()); + } + } catch (const std::exception & e) { + fprintf(stderr, "hf download error: %s\n", e.what()); + } + return true; +} + +static bool handle_import(const CliOptions & opts, const std::string & config_path, int & exit_code) { + if (opts.import_dir.empty()) { + return false; + } + + exit_code = 0; + + const std::string import_dir = expand_user_path(opts.import_dir); + auto scanned = scan_custom_dir(import_dir, "manual"); + + RouterConfig cfg; + try { + cfg = load_config(config_path); + } catch (const std::exception & e) { + fprintf(stderr, "%s\n", e.what()); + exit_code = 1; + return true; + } + + std::unordered_set existing_paths; + for (const auto & model : cfg.models) { + existing_paths.insert(expand_user_path(model.path)); + } + + size_t added = 0; + for (auto & model : scanned) { + const auto expanded = expand_user_path(model.path); + if (existing_paths.insert(expanded).second) { + cfg.models.push_back(std::move(model)); + ++added; + } + } + + if (added > 0) { + write_config_file(cfg, config_path); + } + + LOG_INF("Imported %zu models from %s\n", added, import_dir.c_str()); + return true; +} + +int main(int argc, char ** argv) { + CliOptions cli; + router_log_init(); + + LOG_INF("Parsing %d CLI arguments for llama-router\n", argc); + + if (!parse_cli(argc, argv, cli)) { + return 1; + } + + if (cli.show_help) { + print_help(); + return 0; + } + + std::string config_path = !cli.config_path.empty() ? expand_user_path(cli.config_path) : get_default_config_path(); + + if (handle_download(cli, config_path)) { + LOG_INF("Download-only mode completed, exiting\n"); + return 0; + } + + int import_exit_code = 0; + if (handle_import(cli, config_path, import_exit_code)) { + return import_exit_code; + } + LOG_INF("Loading router configuration from %s\n", config_path.c_str()); + + RouterConfig cfg; + try { + cfg = load_config(config_path); + } catch (const std::exception & e) { + fprintf(stderr, "%s\n", e.what()); + return 1; + } + + LOG_INF("Router configuration loaded: %zu models, base port %d, listen %s:%d\n", + cfg.models.size(), cfg.router.base_port, cfg.router.host.c_str(), cfg.router.port); + + RouterApp app(cfg); + LOG_INF("Initialized RouterApp with default spawn command size=%zu\n", cfg.default_spawn.command.size()); + + if (!cfg.startup_model.empty()) { + std::string error; + LOG_INF("Ensuring startup model '%s' is running before accepting requests\n", cfg.startup_model.c_str()); + if (!app.ensure_running(cfg.startup_model, error)) { + LOG_ERR("Failed to start startup_model '%s': %s\n", cfg.startup_model.c_str(), error.c_str()); + return 1; + } + LOG_INF("Startup model '%s' is ready\n", cfg.startup_model.c_str()); + } + + httplib::Server server; + g_server = &server; + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + register_routes(server, app); + register_admin_routes(server, app, config_path); + + std::string host = cfg.router.host; + int port = cfg.router.port; + + LOG_INF("llama-router listening on %s:%d\n", host.c_str(), port); + std::atomic listen_ok{true}; + std::thread server_thread([&]() { + if (!server.listen(host.c_str(), port)) { + listen_ok = false; + g_shutdown = true; + } + }); + + while (!g_shutdown.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + server.stop(); + if (server_thread.joinable()) { + server_thread.join(); + } + + LOG_INF("llama-router shutting down, stopping all managed models\n"); + app.stop_all(); + g_server = nullptr; + return listen_ok ? 0 : 1; +} diff --git a/tools/server/public/index.html.gz b/tools/server/public/index.html.gz index ae25b6ddf..6497c3171 100644 Binary files a/tools/server/public/index.html.gz and b/tools/server/public/index.html.gz differ