Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/test/ut/session_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests {
connection.Open();

{
constexpr const size_t sessionCount = 10;
constexpr const size_t sessionCount = 30;
GTEST_LOG_(INFO) << "Opening " << sessionCount << " sessions.";
std::vector<Session> sessions;
for (size_t i = 0; i < sessionCount; i += 1)
Expand Down
95 changes: 55 additions & 40 deletions sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ typedef struct CHANNEL_TABLE_TABLE_TAG
// When a channel is freed, the entry is set back to ENDPOINT_UNUSED_CHANNEL.
//
CHANNEL_TABLE_ENTRY* channels;
uint16_t capacity;
uint16_t size;
uint32_t capacity;
uint32_t size;
} CHANNEL_TABLE;

typedef struct CONNECTION_INSTANCE_TAG
Expand Down Expand Up @@ -173,49 +173,64 @@ static int channel_table_allocate(CHANNEL_TABLE* channel_table, uint16_t* h)
else
{
*h = 0;

if (channel_table->size == channel_table->capacity)
{
CHANNEL_TABLE_ENTRY* new_handles = (CHANNEL_TABLE_ENTRY*)realloc(
channel_table->channels, sizeof(CHANNEL_TABLE_ENTRY) * channel_table->capacity * 2);
if (new_handles == NULL)
{
LogError("Cannot reallocate memory for handle table");
result = MU_FAILURE;
}
else
{
channel_table->channels = new_handles;
// Ensure the newly resized handle table is empty.
for (uint16_t i = channel_table->size; i < channel_table->capacity * 2; i++)
if (channel_table->capacity == UINT16_MAX)
{
channel_table->channels[i].outgoing_channel = ENDPOINT_UNUSED_CHANNEL;
channel_table->channels[i].incoming_channel = ENDPOINT_UNUSED_CHANNEL;
channel_table->channels[i].is_endpoint_live = false;
LogError("Cannot allocate more channels");
result = MU_FAILURE;
}
else
{
uint32_t new_capacity = channel_table->capacity + INITIAL_CHANNEL_TABLE_CAPACITY;
if (channel_table->capacity >= UINT16_MAX)
{
new_capacity = UINT16_MAX;
}
CHANNEL_TABLE_ENTRY* new_channels = (CHANNEL_TABLE_ENTRY*)realloc(
channel_table->channels, sizeof(CHANNEL_TABLE_ENTRY) * new_capacity);
if (new_channels == NULL)
{
LogError("Cannot reallocate memory for handle table");
result = MU_FAILURE;
}
else
{
channel_table->channels = new_channels;
channel_table->capacity = new_capacity;
// Ensure the newly resized part of the handle table is empty.
uint32_t i = channel_table->size;
do
{
channel_table->channels[i].outgoing_channel = ENDPOINT_UNUSED_CHANNEL;
channel_table->channels[i].incoming_channel = ENDPOINT_UNUSED_CHANNEL;
channel_table->channels[i].is_endpoint_live = false;
i++;
} while (i < channel_table->capacity);
}
}
channel_table->capacity *= 2;
}
}
if (!result)
{
uint16_t i = 0;
for (; i < channel_table->size; i++)
{
if (channel_table->channels[i].outgoing_channel == ENDPOINT_UNUSED_CHANNEL)
// Look for an empty slot in the table. If none is found, add to the end of the table.
uint32_t i = 0;
for (; i < channel_table->size; i++)
{
if (channel_table->channels[i].outgoing_channel == ENDPOINT_UNUSED_CHANNEL)
{
break;
}
}
// If we didn't find a hole, we need to increase the size of the table by 1.
if (i == channel_table->size)
{
*h = i;
channel_table->channels[i].outgoing_channel = i;
break;
channel_table->size++;
}
}
// If we didn't find a hole, add this to the end of the table.
if (i == channel_table->size)
{
*h = channel_table->size;
channel_table->channels[channel_table->size].outgoing_channel = channel_table->size;
channel_table->channels[channel_table->size].is_endpoint_live = true;
channel_table->size++;
}

*h = (uint16_t)i;
channel_table->channels[i].outgoing_channel = (uint16_t)i;
channel_table->channels[i].is_endpoint_live = true;

}
}

Expand Down Expand Up @@ -305,7 +320,7 @@ static int channel_table_find_outgoing_channel_from_incoming_channel(CHANNEL_TAB
}
if (i == channel_table->size)
{
LogError("Could not find incoming channel %d", incoming_channel);
LogError("Could not find incoming channel %hu", incoming_channel);
result = MU_FAILURE;
}

Expand Down Expand Up @@ -468,7 +483,7 @@ static void log_incoming_frame(uint16_t channel, AMQP_VALUE performative)
else
{
char* performative_as_string;
LOG(AZ_LOG_TRACE, 0, "%d:", channel)
LOG(AZ_LOG_TRACE, 0, "%hu:", channel)
LOG(AZ_LOG_TRACE, 0, "<- ");
LOG(AZ_LOG_TRACE, 0, "%s", (char*)get_frame_type_as_string(descriptor));
performative_as_string = NULL;
Expand All @@ -494,7 +509,7 @@ static void log_outgoing_frame(uint16_t channel, AMQP_VALUE performative)
else
{
char* performative_as_string;
LOG(AZ_LOG_TRACE, 0, "%d:", channel)
LOG(AZ_LOG_TRACE, 0, "%hu:", channel)
LOG(AZ_LOG_TRACE, 0, "-> ");
LOG(AZ_LOG_TRACE, 0, "%s", (char*)get_frame_type_as_string(descriptor));
performative_as_string = NULL;
Expand Down Expand Up @@ -1290,7 +1305,7 @@ static void on_amqp_frame_received(void* context, uint16_t channel, AMQP_VALUE p
ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_outgoing_channel(connection, remote_channel);
if (session_endpoint == NULL)
{
LogError("Cannot find session endpoint corresponding to remote channel %d", remote_channel);
LogError("Cannot find session endpoint corresponding to remote channel %hu", remote_channel);
}
else
{
Expand Down