Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cpp/include/tensorrt_llm/common/tllmException.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,33 @@

#include <array>
#include <cstddef>
#include <cstdint>
#include <stdexcept>
#include <string>

#define NEW_TLLM_EXCEPTION(...) \
tensorrt_llm::common::TllmException(__FILE__, __LINE__, tensorrt_llm::common::fmtstr(__VA_ARGS__).c_str())

#define NEW_TLLM_REQUEST_SPECIFIC_EXCEPTION_WITH_ERROR_CODE(requestID, errorCode, ...) \
tensorrt_llm::common::RequestSpecificException( \
__FILE__, __LINE__, tensorrt_llm::common::fmtstr(__VA_ARGS__).c_str(), requestID, errorCode)

namespace tensorrt_llm::common
{

/// @brief Enumeration of different error codes for request-specific exceptions
enum class RequestErrorCode : uint32_t
{
// General errors (0-999)
kUNKNOWN_ERROR = 0,

// Network and communication errors (1000-1999)
kNETWORK_ERROR = 1000,
};

/// @brief Constant for unknown request ID
static constexpr uint64_t kUNKNOWN_REQUEST_ID = static_cast<uint64_t>(-1);

class TllmException : public std::runtime_error
{
public:
Expand All @@ -45,4 +63,21 @@ class TllmException : public std::runtime_error
int mNbFrames;
};

class RequestSpecificException : public std::runtime_error
{
public:
explicit RequestSpecificException(
char const* file, std::size_t line, char const* msg, uint64_t requestID, RequestErrorCode errorCode);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use const std::string& instead of const char* if possible.


~RequestSpecificException() noexcept override;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please confirm the behavior of override a dtor.


[[nodiscard]] uint64_t getRequestId() const noexcept;

[[nodiscard]] RequestErrorCode getErrorCode() const noexcept;

private:
uint64_t mRequestID;
RequestErrorCode mErrorCode;
};

} // namespace tensorrt_llm::common
1 change: 1 addition & 0 deletions cpp/include/tensorrt_llm/executor/dataTransceiverState.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
namespace tensorrt_llm::executor
{

// Forward declaration
class Serialization;

namespace kv_cache
Expand Down
117 changes: 117 additions & 0 deletions cpp/include/tensorrt_llm/executor/dataTransceiverStateUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing include guard.

Header files must use include guards named TRTLLM_<FILENAME>_H according to the coding guidelines.

Apply this diff to add the required include guard:

-#pragma once
+#ifndef TRTLLM_DATATRANSCEIVERSTATE_UTILS_H
+#define TRTLLM_DATATRANSCEIVERSTATE_UTILS_H

And add the closing endif at the end of the file:

 } // namespace tensorrt_llm::executor
+
+#endif // TRTLLM_DATATRANSCEIVERSTATE_UTILS_H

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/executor/dataTransceiverStateUtils.h around line 17,
the header is missing the required include guard; add a top-level include guard
using the macro name TRTLLM_DATATRANSCEIVERSTATEUTILS_H (per guideline) and
place the corresponding #endif at the end of the file so the file is properly
protected against multiple inclusion.


#include "tensorrt_llm/executor/dataTransceiverState.h"
#include "tensorrt_llm/executor/serialization.h"
#include <vector>

namespace tensorrt_llm::executor
{

// Utility functions for creating and serializing DataTransceiverState

/**
* @brief Create a serialized DataTransceiverState with socket communication state
*
* @param nbKvHeadsPerLayer Vector of number of KV heads per layer
* @param sizePerHead Size of each attention head
* @param tokensPerBlock Number of tokens per block
* @param tensorParallelism Tensor parallelism size
* @param pipelineParallelism Pipeline parallelism size
* @param dataType Data type for the cache
* @param socketAddresses Vector of socket addresses for communication
* @param attentionType Attention type (DEFAULT or MLA)
* @param kvFactor KV factor (default: 2)
* @param enableAttentionDP Whether to enable attention data parallelism
* @param dpRank Data parallelism rank (default: 0)
* @param dpSize Data parallelism size (default: 0)
* @param rank Current rank
* @return std::vector<char> The serialized DataTransceiverState as bytes
*/
inline std::vector<char> createDataTransceiverStateSocket(std::vector<SizeType32> const& nbKvHeadsPerLayer,
SizeType32 sizePerHead, SizeType32 tokensPerBlock, SizeType32 tensorParallelism, SizeType32 pipelineParallelism,
nvinfer1::DataType dataType, std::vector<std::string> const& socketAddresses,
kv_cache::CacheState::AttentionType attentionType, int kvFactor, bool enableAttentionDP, int dpRank, int dpSize,
int rank)
Comment on lines +46 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding input validation for critical parameters.

Both functions accept multiple parameters that could be invalid (e.g., empty vectors, zero values for critical dimensions). Consider adding basic validation to prevent runtime errors downstream.

Add validation at the beginning of both functions:

 inline std::vector<char> createDataTransceiverStateSocket(std::vector<SizeType32> const& nbKvHeadsPerLayer,
     SizeType32 sizePerHead, SizeType32 tokensPerBlock, SizeType32 tensorParallelism, SizeType32 pipelineParallelism,
     nvinfer1::DataType dataType, std::vector<std::string> const& socketAddresses,
     kv_cache::CacheState::AttentionType attentionType, int kvFactor, bool enableAttentionDP, int dpRank, int dpSize,
     int rank)
 {
+    if (nbKvHeadsPerLayer.empty() || socketAddresses.empty() || sizePerHead == 0 || tokensPerBlock == 0)
+    {
+        throw std::invalid_argument("Invalid parameters: vectors cannot be empty and dimensions must be positive");
+    }

Also applies to: 91-95

🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/executor/dataTransceiverStateUtils.h around lines
46-50 (and similarly 91-95), add input validation at the start of both
functions: check that vectors (nbKvHeadsPerLayer, socketAddresses) are not
empty, that sizePerHead, tokensPerBlock, tensorParallelism, pipelineParallelism,
kvFactor are > 0, that rank, dpRank are >= 0 and
dpSize/tensorParallelism/pipelineParallelism are >= 1 and that rank/dpRank <
total ranks (e.g., < dpSize or < tensorParallelism*pipelineParallelism as
applicable); validate that attentionType and dataType are within expected enums
if applicable. If any check fails, throw a clear std::invalid_argument with a
message indicating which parameter is invalid (or return an error code if
project style prefers), so downstream code cannot proceed with invalid
dimensions. Ensure mirror checks are added to the second function at lines 91-95
with consistent error messages.

{
// Create CacheState using the simpler constructor
kv_cache::CacheState cacheState(nbKvHeadsPerLayer, sizePerHead, tokensPerBlock, tensorParallelism,
pipelineParallelism, dataType, attentionType, kvFactor, enableAttentionDP, dpRank, dpSize);

// Create Socket CommState
std::vector<kv_cache::SocketState> socketStates;
for (size_t i = 0; i < socketAddresses.size(); ++i)
{
kv_cache::SocketState socketState{static_cast<uint16_t>(8000 + i), socketAddresses[i]};
socketStates.emplace_back(std::move(socketState));
}
Comment on lines +58 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential integer overflow and type safety concerns.

There are several issues with the socket state creation:

  1. static_cast<uint16_t>(8000 + i) could overflow if i is large
  2. Using size_t for loop variable but casting to uint16_t
  3. No validation of port range limits

Apply this diff to add proper bounds checking and type safety:

     for (size_t i = 0; i < socketAddresses.size(); ++i)
     {
-        kv_cache::SocketState socketState{static_cast<uint16_t>(8000 + i), socketAddresses[i]};
+        if (8000 + i > std::numeric_limits<uint16_t>::max())
+        {
+            throw std::runtime_error("Port number exceeds uint16_t range");
+        }
+        kv_cache::SocketState socketState{static_cast<uint16_t>(8000 + i), socketAddresses[i]};
         socketStates.emplace_back(std::move(socketState));
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (size_t i = 0; i < socketAddresses.size(); ++i)
{
kv_cache::SocketState socketState{static_cast<uint16_t>(8000 + i), socketAddresses[i]};
socketStates.emplace_back(std::move(socketState));
}
for (size_t i = 0; i < socketAddresses.size(); ++i)
{
if (8000 + i > std::numeric_limits<uint16_t>::max())
{
throw std::runtime_error("Port number exceeds uint16_t range");
}
kv_cache::SocketState socketState{static_cast<uint16_t>(8000 + i), socketAddresses[i]};
socketStates.emplace_back(std::move(socketState));
}
🤖 Prompt for AI Agents
In cpp/include/tensorrt_llm/executor/dataTransceiverStateUtils.h around lines
58-62, the code unsafely casts 8000 + i to uint16_t and uses size_t for the loop
without validating port range; change the loop to iterate with a well-typed
integer (e.g. uint32_t or size_t) but compute the port as a checked integer:
compute uint32_t port = basePort + static_cast<uint32_t>(i); verify port is
within the valid TCP port range (>=1 and <= 65535) and fits into uint16_t before
casting; if the check fails, handle it (throw an exception or return an error)
instead of silently wrapping; finally static_cast<uint16_t>(port) only after the
check and use that safe value when constructing kv_cache::SocketState.


kv_cache::CommState commState(std::move(socketStates), rank);

// Create DataTransceiverState
DataTransceiverState state(std::move(cacheState), std::move(commState));

// Serialize and return the serialized data
return Serialization::serialize(state);
}

/**
* @brief Create a serialized DataTransceiverState with agent communication state
*
* @param nbKvHeadsPerLayer Vector of number of KV heads per layer
* @param sizePerHead Size of each attention head
* @param tokensPerBlock Number of tokens per block
* @param tensorParallelism Tensor parallelism size
* @param pipelineParallelism Pipeline parallelism size
* @param dataType Data type for the cache
* @param agentNames Vector of agent names for communication
* @param attentionType Attention type (DEFAULT or MLA)
* @param kvFactor KV factor (default: 2)
* @param enableAttentionDP Whether to enable attention data parallelism
* @param dpRank Data parallelism rank (default: 0)
* @param dpSize Data parallelism size (default: 0)
* @param rank Current rank
* @return std::vector<char> The serialized DataTransceiverState as bytes
*/
inline std::vector<char> createDataTransceiverStateAgent(std::vector<SizeType32> const& nbKvHeadsPerLayer,
SizeType32 sizePerHead, SizeType32 tokensPerBlock, SizeType32 tensorParallelism, SizeType32 pipelineParallelism,
nvinfer1::DataType dataType, std::vector<std::string> const& agentNames,
kv_cache::CacheState::AttentionType attentionType, int kvFactor, bool enableAttentionDP, int dpRank, int dpSize,
int rank)
{
// Create CacheState using the simpler constructor
kv_cache::CacheState cacheState(nbKvHeadsPerLayer, sizePerHead, tokensPerBlock, tensorParallelism,
pipelineParallelism, dataType, attentionType, kvFactor, enableAttentionDP, dpRank, dpSize);

// Create Agent CommState
std::vector<kv_cache::AgentState> agentStates;
for (size_t i = 0; i < agentNames.size(); ++i)
{
agentStates.emplace_back(agentNames[i], "127.0.0.1:" + std::to_string(8000 + i));
}
Comment on lines +103 to +106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential integer overflow in agent connection info.

Similar to the socket version, there's a potential overflow when converting 8000 + i to string without bounds checking.

Apply this diff to add bounds checking:

     for (size_t i = 0; i < agentNames.size(); ++i)
     {
+        if (8000 + i > std::numeric_limits<uint16_t>::max())
+        {
+            throw std::runtime_error("Port number exceeds uint16_t range");
+        }
         agentStates.emplace_back(agentNames[i], "127.0.0.1:" + std::to_string(8000 + i));
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (size_t i = 0; i < agentNames.size(); ++i)
{
agentStates.emplace_back(agentNames[i], "127.0.0.1:" + std::to_string(8000 + i));
}
for (size_t i = 0; i < agentNames.size(); ++i)
{
if (8000 + i > std::numeric_limits<uint16_t>::max())
{
throw std::runtime_error("Port number exceeds uint16_t range");
}
agentStates.emplace_back(
agentNames[i],
"127.0.0.1:" + std::to_string(8000 + i)
);
}


kv_cache::CommState commState(std::move(agentStates), rank);

// Create DataTransceiverState
DataTransceiverState state(std::move(cacheState), std::move(commState));

// Serialize and return the serialized data
return Serialization::serialize(state);
}

} // namespace tensorrt_llm::executor
24 changes: 24 additions & 0 deletions cpp/include/tensorrt_llm/executor/version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

// THIS FILE IS AUTO GENERATED FROM cmake/templates/version.h. DO NOT EDIT.

namespace tensorrt_llm::executor
{
static auto constexpr kTensorRtLlmVersion = "1.1.0rc0";
}
17 changes: 17 additions & 0 deletions cpp/tensorrt_llm/batch_manager/dataTransceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "tensorrt_llm/batch_manager/runtimeBuffers.h"
#include "tensorrt_llm/common/envUtils.h"
#include "tensorrt_llm/common/logger.h"
#include "tensorrt_llm/common/tllmException.h"
#include "tensorrt_llm/common/utils.h"
#include "tensorrt_llm/runtime/utils/mpiUtils.h"
#include <future>
Expand Down Expand Up @@ -190,6 +191,13 @@ class DataResponder::Impl
mSender->release(id);
resp.mPromise.set_value();
}
catch (tensorrt_llm::common::RequestSpecificException const& e)
{
TLLM_LOG_ERROR("Exception in sendAndRemoveResponse: %s ", e.what());
auto new_exception
= NEW_TLLM_REQUEST_SPECIFIC_EXCEPTION_WITH_ERROR_CODE(id, e.getErrorCode(), "%s", e.what());
resp.mPromise.set_exception(std::make_exception_ptr(new_exception));
}
catch (std::exception const& e)
{
TLLM_LOG_ERROR("Exception in sendAndRemoveResponse: %s ", e.what());
Expand Down Expand Up @@ -496,6 +504,15 @@ class DataRequester::Impl
requestSync(*requestAndPromise.mRequest);
requestAndPromise.mPromise->set_value();
}
catch (tensorrt_llm::common::RequestSpecificException const& err)
{
TLLM_LOG_ERROR("Exception in DataRequester request(): request id:%ld , request context id:%ld : %s",
requestAndPromise.mRequest->mRequestId,
requestAndPromise.mRequest->getContextPhaseParams().value().getReqId(), err.what());
auto new_exception = NEW_TLLM_REQUEST_SPECIFIC_EXCEPTION_WITH_ERROR_CODE(
requestAndPromise.mRequest->mRequestId, err.getErrorCode(), "%s", err.what());
requestAndPromise.mPromise->set_exception(std::make_exception_ptr(new_exception));
}
catch (std::exception const& err)
{
TLLM_LOG_ERROR("Exception in DataRequester request(): request id:%ld , request context id:%ld : %s",
Expand Down
20 changes: 18 additions & 2 deletions cpp/tensorrt_llm/batch_manager/dataTransceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,28 @@ class TransferSession

void send(size_t idx, void const* data, size_t size)
{
mConnections.at(idx)->send(mDataContext, data, size);
try
{
mConnections.at(idx)->send(mDataContext, data, size);
}
catch (std::exception const& e)
{
throw common::RequestSpecificException(
__FILE__, __LINE__, e.what(), common::kUNKNOWN_REQUEST_ID, common::RequestErrorCode::kNETWORK_ERROR);
}
}

void recv(size_t idx, void* data, size_t size)
{
mConnections.at(idx)->recv(mDataContext, data, size);
try
{
mConnections.at(idx)->recv(mDataContext, data, size);
}
catch (std::exception const& e)
{
throw common::RequestSpecificException(
__FILE__, __LINE__, e.what(), common::kUNKNOWN_REQUEST_ID, common::RequestErrorCode::kNETWORK_ERROR);
}
}

[[nodiscard]] LlmRequest const& getLlmRequest() const
Expand Down
21 changes: 21 additions & 0 deletions cpp/tensorrt_llm/common/tllmException.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,25 @@ std::string TllmException::demangle(char const* name)
#endif
}

RequestSpecificException::RequestSpecificException(
char const* file, std::size_t line, char const* msg, uint64_t requestID, RequestErrorCode errorCode)
: std::runtime_error{fmtstr(
"%s (Request ID: %lu, Error Code: %u) (%s:%zu)", msg, requestID, static_cast<uint32_t>(errorCode), file, line)}
, mRequestID{requestID}
, mErrorCode{errorCode}
{
}
Comment on lines +109 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use portable format specifiers for cross-platform compatibility.

The format specifier %lu for uint64_t is not portable across different platforms. Use the appropriate format macro from <cinttypes>.

Add the include at the top of the file:

#include <cinttypes>

Then update the format string:

-        "%s (Request ID: %lu, Error Code: %u) (%s:%zu)", msg, requestID, static_cast<uint32_t>(errorCode), file, line)}
+        "%s (Request ID: %" PRIu64 ", Error Code: %u) (%s:%zu)", msg, requestID, static_cast<uint32_t>(errorCode), file, line)}
🤖 Prompt for AI Agents
In cpp/tensorrt_llm/common/tllmException.cpp around lines 109 to 116, the format
specifier %lu used for the uint64_t requestID is not portable across platforms.
To fix this, include <cinttypes> at the top of the file and replace %lu with the
portable macro PRIu64 in the format string to ensure cross-platform
compatibility.


RequestSpecificException::~RequestSpecificException() noexcept = default;

uint64_t RequestSpecificException::getRequestId() const noexcept
{
return mRequestID;
}

RequestErrorCode RequestSpecificException::getErrorCode() const noexcept
{
return mErrorCode;
}

} // namespace tensorrt_llm::common
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@

#include "tensorrt_llm/batch_manager/dataTransceiverImpl.h"
#include "tensorrt_llm/common/cudaUtils.h"
#include "tensorrt_llm/common/tllmException.h"
#include "tensorrt_llm/executor/cache_transmission/ucx_utils/connection.h"

namespace tensorrt_llm::executor::kv_cache
{

// Using declarations to shorten the code
using RequestSpecificException = tensorrt_llm::common::RequestSpecificException;
using RequestErrorCode = tensorrt_llm::common::RequestErrorCode;

UcxConnection::UcxConnection(ConnectionIdType connectionId, std::shared_ptr<ucxx::Endpoint> endpoint,
UcxConnectionManager* manager, bool fromRequester)
: mConnectionId(connectionId)
Expand Down
2 changes: 2 additions & 0 deletions cpp/tensorrt_llm/pybind/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ set(SRCS
executor/executor.cpp
executor/executorConfig.cpp
executor/request.cpp
executor/dataTransceiverStateUtils.cpp
runtime/bindings.cpp
common/tllmExceptions.cpp
testing/modelSpecBinding.cpp
runtime/moeBindings.cpp
userbuffers/bindings.cpp
Expand Down
5 changes: 5 additions & 0 deletions cpp/tensorrt_llm/pybind/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
#include "tensorrt_llm/pybind/batch_manager/cacheTransceiver.h"
#include "tensorrt_llm/pybind/batch_manager/kvCacheManager.h"
#include "tensorrt_llm/pybind/batch_manager/llmRequest.h"
#include "tensorrt_llm/pybind/common/tllmExceptions.h"
#include "tensorrt_llm/pybind/executor/bindings.h"
#include "tensorrt_llm/pybind/executor/dataTransceiverStateUtils.h"
#include "tensorrt_llm/pybind/runtime/bindings.h"
#include "tensorrt_llm/pybind/testing/modelSpecBinding.h"
#include "tensorrt_llm/pybind/userbuffers/bindings.h"
Expand Down Expand Up @@ -116,9 +118,12 @@ PYBIND11_MODULE(TRTLLM_PYBIND_MODULE, m)
auto mInternalRuntime = mInternal.def_submodule("runtime", "Runtime internal bindings");
auto mInternalTesting = mInternal.def_submodule("testing", "Testing internal bindings");
auto mInternalBatchManager = mInternal.def_submodule("batch_manager", "Batch manager internal bindings");
auto mExceptions = m.def_submodule("exceptions", "Exceptions internal bindings");

tensorrt_llm::pybind::executor::initBindings(mExecutor);
tensorrt_llm::pybind::runtime::initBindingsEarly(mInternalRuntime);
tensorrt_llm::pybind::common::initExceptionsBindings(mExceptions);
tensorrt_llm::pybind::executor::bindDataTransceiverStateUtils(mExceptions);

auto buildInfo = m.def_submodule("BuildInfo");
buildInfo.attr("ENABLE_MULTI_DEVICE") = py::int_(ENABLE_MULTI_DEVICE);
Expand Down
Loading