diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java index a9848633effa3..43824f6bf0a3c 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java @@ -985,9 +985,6 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException segmentIdsBeingCopied.add(segmentId); try { copyLogSegment(log, candidateLogSegment.logSegment, segmentId, candidateLogSegment.nextSegmentOffset); - } catch (Exception e) { - recordLagStats(log); - throw e; } finally { segmentIdsBeingCopied.remove(segmentId); } @@ -1009,6 +1006,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark(); logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex); } + } finally { + recordLagStats(log); } } @@ -1093,14 +1092,6 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment log.updateHighestOffsetInRemoteStorage(endOffset); logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId()); - - recordLagStats(log); - } - - private void recordLagStats(UnifiedLog log) { - long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size(); - long segmentsLag = log.onlyLocalLogSegmentsCount() - 1; - recordLagStats(bytesLag, segmentsLag); } // VisibleForTesting @@ -1120,6 +1111,17 @@ void resetLagStats() { brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0); } + // VisibleForTesting + void recordLagStats(UnifiedLog log) { + try { + long bytesLag = Math.max(0, log.onlyLocalLogSegmentsSize() - log.activeSegment().size()); + long segmentsLag = Math.max(0, log.onlyLocalLogSegmentsCount() - 1); + recordLagStats(bytesLag, segmentsLag); + } catch (Exception e) { + logger.debug("Failed to record lag stats for partition {}", topicIdPartition, e); + } + } + private Path toPathIfExists(File file) { return file.exists() ? file.toPath() : null; } diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index dc80383dc668b..467bee5e12013 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -392,7 +392,7 @@ public long logStartOffset() { return logStartOffset; } - long highestOffsetInRemoteStorage() { + public long highestOffsetInRemoteStorage() { return highestOffsetInRemoteStorage; } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java index 5b0676088e1e5..35ca5539aea3b 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java @@ -3632,6 +3632,71 @@ public void testTierLagResetsToZeroOnBecomingFollower() { assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments()); } + @Test + public void testCopyLagMetricsWithOnlyActiveSegment() { + LogSegment activeSegment = mock(LogSegment.class); + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(100L); + when(activeSegment.size()).thenReturn(100); + when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(1L); + + remoteLogManager.onLeadershipChange( + Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds); + RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition); + assertNotNull(rlmTask); + + rlmTask.recordLagStats(mockLog); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments()); + } + + @Test + public void testCopyLagMetricsWithMultipleSegments() { + LogSegment activeSegment = mock(LogSegment.class); + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(300L); + when(activeSegment.size()).thenReturn(100); + when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(3L); + + remoteLogManager.onLeadershipChange( + Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds); + RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition); + assertNotNull(rlmTask); + + rlmTask.recordLagStats(mockLog); + assertEquals(200, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes()); + assertEquals(2, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments()); + } + + @Test + public void testCopyLagMetricsAfterLeaderChangeWithHigherRemoteOffset() { + // Edge case after leader change: active segment base offset is less than highestOffsetInRemoteStorage + // This can happen when a new leader takes over and remote storage has data uploaded by the previous leader + // onlyLocalLogSegmentsSize returns 0 because active segment doesn't pass the filter (baseOffset < highestOffsetInRemoteStorage) + // Without Math.max this would be negative + LogSegment activeSegment = mock(LogSegment.class); + when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition()); + when(mockLog.activeSegment()).thenReturn(activeSegment); + when(mockLog.highestOffsetInRemoteStorage()).thenReturn(125L); + when(activeSegment.baseOffset()).thenReturn(100L); + when(mockLog.logEndOffset()).thenReturn(150L); + when(mockLog.lastStableOffset()).thenReturn(150L); + when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(0L); + when(activeSegment.size()).thenReturn(100); + when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(0L); + + remoteLogManager.onLeadershipChange( + Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds); + RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition); + assertNotNull(rlmTask); + + rlmTask.recordLagStats(mockLog); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes()); + assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments()); + } + @Test public void testRemoteReadFetchDataInfo() throws RemoteStorageException, IOException { checkpoint.write(totalEpochEntries);