Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,10 @@ private void handleTimeoutFailure(long now, Throwable cause) {
log.debug("{} timed out at {} after {} attempt(s)", this, now, tries,
new Exception(prettyPrintException(cause)));
}
if (cause instanceof TimeoutException) {
// Don't mask OutOfMemoryError as TimeoutException - propagate it directly
if (cause instanceof OutOfMemoryError) {
handleFailure(cause);
} else if (cause instanceof TimeoutException) {
handleFailure(cause);
} else {
handleFailure(new TimeoutException(this + " timed out at " + now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -11749,4 +11750,84 @@ public void testDescribeTopicsTimeoutWhenNoBrokerResponds() throws Exception {
assertTrue(duration >= 150L && duration < 30000);
}
}

/**
* Test that OutOfMemoryError is properly propagated and not masked as TimeoutException.
* This test simulates an OOM error during response processing and verifies it propagates
* without being wrapped. This is a regression test for KAFKA-19932.
*/
@Test
public void testOutOfMemoryErrorPropagation() throws Exception {
MockTime time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(1, 0),
AdminClientConfig.RETRIES_CONFIG, "2",
AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "100")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

// Create a spy of MetadataResponse that throws OutOfMemoryError when topicMetadata() is accessed
// The AdminClient calls response.topicMetadata() in listTopics handleResponse(), which will trigger the OOM
MetadataResponseData data = new MetadataResponseData();
MetadataResponse realResponse = new MetadataResponse(data, ApiKeys.METADATA.latestVersion());
MetadataResponse spyResponse = spy(realResponse);

// Configure the spy to throw OutOfMemoryError when topicMetadata() is called
// This simulates an OOM occurring during response processing
doThrow(new OutOfMemoryError("Simulated OOM during response handling"))
.when(spyResponse).topicMetadata();

// Prepare the mocked response that will throw OOM
env.kafkaClient().prepareResponse(spyResponse);

// Make the listTopics call - this will internally trigger a metadata request
ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(10000));

// The OOM should propagate as-is, not be wrapped in TimeoutException
// We expect ExecutionException wrapping the OutOfMemoryError (this is standard Future behavior)
ExecutionException exception = assertThrows(ExecutionException.class, () -> result.names().get());
assertInstanceOf(OutOfMemoryError.class, exception.getCause(),
"Expected OutOfMemoryError to be propagated, but got: " + exception.getCause());
assertEquals("Simulated OOM during response handling", exception.getCause().getMessage());
}
}

/**
* Test that OutOfMemoryError is not masked as TimeoutException even when deadline expires.
* This verifies that OOM errors take precedence over timeout handling.
* This is a regression test for KAFKA-19932.
*/
@Test
public void testOutOfMemoryErrorNotMaskedOnTimeout() throws Exception {
MockTime time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(1, 0),
AdminClientConfig.RETRIES_CONFIG, "0",
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());

// Create a spy that throws OOM when accessed
MetadataResponseData data = new MetadataResponseData();
MetadataResponse realResponse = new MetadataResponse(data, ApiKeys.METADATA.latestVersion());
MetadataResponse spyResponse = spy(realResponse);

// Configure spy to throw OOM when topicMetadata() is accessed
// This is the actual method called by AdminClient during response processing
doThrow(new OutOfMemoryError("Simulated OOM during response handling with timeout"))
.when(spyResponse).topicMetadata();

// Prepare the response
env.kafkaClient().prepareResponse(spyResponse);

// Make the call with a short timeout
ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(1000));

// Advance time to exceed the timeout - this tests that even when timeout occurs,
// the OOM error is still propagated instead of being masked by TimeoutException
time.sleep(1500);

// Even with timeout expired, OOM should still propagate as-is, not wrapped in TimeoutException
ExecutionException exception = assertThrows(ExecutionException.class, () -> result.names().get());
assertInstanceOf(OutOfMemoryError.class, exception.getCause(),
"Expected OutOfMemoryError to be propagated even on timeout, but got: " + exception.getCause());
assertEquals("Simulated OOM during response handling with timeout", exception.getCause().getMessage());
}
}
}