Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,6 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
segmentIdsBeingCopied.add(segmentId);
try {
copyLogSegment(log, candidateLogSegment.logSegment, segmentId, candidateLogSegment.nextSegmentOffset);
} catch (Exception e) {
recordLagStats(log);
throw e;
} finally {
segmentIdsBeingCopied.remove(segmentId);
}
Expand All @@ -1009,6 +1006,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().mark();
logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex);
}
} finally {
recordLagStats(log);
}
}

Expand Down Expand Up @@ -1093,14 +1092,6 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
log.updateHighestOffsetInRemoteStorage(endOffset);
logger.info("Copied {} to remote storage with segment-id: {}",
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());

recordLagStats(log);
}

private void recordLagStats(UnifiedLog log) {
long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size();
long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
recordLagStats(bytesLag, segmentsLag);
}

// VisibleForTesting
Expand All @@ -1120,6 +1111,17 @@ void resetLagStats() {
brokerTopicStats.recordRemoteCopyLagSegments(topic, partition, 0);
}

// VisibleForTesting
void recordLagStats(UnifiedLog log) {
try {
long bytesLag = Math.max(0, log.onlyLocalLogSegmentsSize() - log.activeSegment().size());
long segmentsLag = Math.max(0, log.onlyLocalLogSegmentsCount() - 1);
recordLagStats(bytesLag, segmentsLag);
} catch (Exception e) {
logger.debug("Failed to record lag stats for partition {}", topicIdPartition, e);
}
}

private Path toPathIfExists(File file) {
return file.exists() ? file.toPath() : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public long logStartOffset() {
return logStartOffset;
}

long highestOffsetInRemoteStorage() {
public long highestOffsetInRemoteStorage() {
return highestOffsetInRemoteStorage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3632,6 +3632,71 @@ public void testTierLagResetsToZeroOnBecomingFollower() {
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}

@Test
public void testCopyLagMetricsWithOnlyActiveSegment() {
LogSegment activeSegment = mock(LogSegment.class);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(100L);
when(activeSegment.size()).thenReturn(100);
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(1L);

remoteLogManager.onLeadershipChange(
Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds);
RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
assertNotNull(rlmTask);

rlmTask.recordLagStats(mockLog);
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}

@Test
public void testCopyLagMetricsWithMultipleSegments() {
LogSegment activeSegment = mock(LogSegment.class);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(300L);
when(activeSegment.size()).thenReturn(100);
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(3L);

remoteLogManager.onLeadershipChange(
Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds);
RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
assertNotNull(rlmTask);

rlmTask.recordLagStats(mockLog);
assertEquals(200, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());
assertEquals(2, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}

@Test
public void testCopyLagMetricsAfterLeaderChangeWithHigherRemoteOffset() {
// Edge case after leader change: active segment base offset is less than highestOffsetInRemoteStorage
// This can happen when a new leader takes over and remote storage has data uploaded by the previous leader
// onlyLocalLogSegmentsSize returns 0 because active segment doesn't pass the filter (baseOffset < highestOffsetInRemoteStorage)
// Without Math.max this would be negative
LogSegment activeSegment = mock(LogSegment.class);
when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
when(mockLog.activeSegment()).thenReturn(activeSegment);
when(mockLog.highestOffsetInRemoteStorage()).thenReturn(125L);
when(activeSegment.baseOffset()).thenReturn(100L);
when(mockLog.logEndOffset()).thenReturn(150L);
when(mockLog.lastStableOffset()).thenReturn(150L);
when(mockLog.onlyLocalLogSegmentsSize()).thenReturn(0L);
when(activeSegment.size()).thenReturn(100);
when(mockLog.onlyLocalLogSegmentsCount()).thenReturn(0L);

remoteLogManager.onLeadershipChange(
Set.of(mockPartition(leaderTopicIdPartition)), Set.of(), topicIds);
RemoteLogManager.RLMCopyTask rlmTask = (RemoteLogManager.RLMCopyTask) remoteLogManager.rlmCopyTask(leaderTopicIdPartition);
assertNotNull(rlmTask);

rlmTask.recordLagStats(mockLog);
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagBytes());
assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}

@Test
public void testRemoteReadFetchDataInfo() throws RemoteStorageException, IOException {
checkpoint.write(totalEpochEntries);
Expand Down
Loading