diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index fa9440dfb9102..1691897c376bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -955,7 +955,10 @@ private void handleTimeoutFailure(long now, Throwable cause) { log.debug("{} timed out at {} after {} attempt(s)", this, now, tries, new Exception(prettyPrintException(cause))); } - if (cause instanceof TimeoutException) { + // Don't mask OutOfMemoryError as TimeoutException - propagate it directly + if (cause instanceof OutOfMemoryError) { + handleFailure(cause); + } else if (cause instanceof TimeoutException) { handleFailure(cause); } else { handleFailure(new TimeoutException(this + " timed out at " + now diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 93f6daaf8ec8f..5b6c1b156d65c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -325,6 +325,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; /** @@ -11749,4 +11750,84 @@ public void testDescribeTopicsTimeoutWhenNoBrokerResponds() throws Exception { assertTrue(duration >= 150L && duration < 30000); } } + + /** + * Test that OutOfMemoryError is properly propagated and not masked as TimeoutException. + * This test simulates an OOM error during response processing and verifies it propagates + * without being wrapped. This is a regression test for KAFKA-19932. + */ + @Test + public void testOutOfMemoryErrorPropagation() throws Exception { + MockTime time = new MockTime(); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(1, 0), + AdminClientConfig.RETRIES_CONFIG, "2", + AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "100")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Create a spy of MetadataResponse that throws OutOfMemoryError when topicMetadata() is accessed + // The AdminClient calls response.topicMetadata() in listTopics handleResponse(), which will trigger the OOM + MetadataResponseData data = new MetadataResponseData(); + MetadataResponse realResponse = new MetadataResponse(data, ApiKeys.METADATA.latestVersion()); + MetadataResponse spyResponse = spy(realResponse); + + // Configure the spy to throw OutOfMemoryError when topicMetadata() is called + // This simulates an OOM occurring during response processing + doThrow(new OutOfMemoryError("Simulated OOM during response handling")) + .when(spyResponse).topicMetadata(); + + // Prepare the mocked response that will throw OOM + env.kafkaClient().prepareResponse(spyResponse); + + // Make the listTopics call - this will internally trigger a metadata request + ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(10000)); + + // The OOM should propagate as-is, not be wrapped in TimeoutException + // We expect ExecutionException wrapping the OutOfMemoryError (this is standard Future behavior) + ExecutionException exception = assertThrows(ExecutionException.class, () -> result.names().get()); + assertInstanceOf(OutOfMemoryError.class, exception.getCause(), + "Expected OutOfMemoryError to be propagated, but got: " + exception.getCause()); + assertEquals("Simulated OOM during response handling", exception.getCause().getMessage()); + } + } + + /** + * Test that OutOfMemoryError is not masked as TimeoutException even when deadline expires. + * This verifies that OOM errors take precedence over timeout handling. + * This is a regression test for KAFKA-19932. + */ + @Test + public void testOutOfMemoryErrorNotMaskedOnTimeout() throws Exception { + MockTime time = new MockTime(); + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(1, 0), + AdminClientConfig.RETRIES_CONFIG, "0", + AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Create a spy that throws OOM when accessed + MetadataResponseData data = new MetadataResponseData(); + MetadataResponse realResponse = new MetadataResponse(data, ApiKeys.METADATA.latestVersion()); + MetadataResponse spyResponse = spy(realResponse); + + // Configure spy to throw OOM when topicMetadata() is accessed + // This is the actual method called by AdminClient during response processing + doThrow(new OutOfMemoryError("Simulated OOM during response handling with timeout")) + .when(spyResponse).topicMetadata(); + + // Prepare the response + env.kafkaClient().prepareResponse(spyResponse); + + // Make the call with a short timeout + ListTopicsResult result = env.adminClient().listTopics(new ListTopicsOptions().timeoutMs(1000)); + + // Advance time to exceed the timeout - this tests that even when timeout occurs, + // the OOM error is still propagated instead of being masked by TimeoutException + time.sleep(1500); + + // Even with timeout expired, OOM should still propagate as-is, not wrapped in TimeoutException + ExecutionException exception = assertThrows(ExecutionException.class, () -> result.names().get()); + assertInstanceOf(OutOfMemoryError.class, exception.getCause(), + "Expected OutOfMemoryError to be propagated even on timeout, but got: " + exception.getCause()); + assertEquals("Simulated OOM during response handling with timeout", exception.getCause().getMessage()); + } + } }