Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
batchAdded
}

/**
* CompactibleFileStreamLog maintains logs by itself, and manual purging might break internal
* state, specifically which latest compaction batch is purged.
*
* To simplify the situation, this method just throws UnsupportedOperationException regardless
* of given parameter, and let CompactibleFileStreamLog handles purging by itself.
*/
override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException(
s"Cannot purge as it might break internal state.")

/**
* Compacts all logs before `batchId` plus the provided `logs`, and writes them into the
* corresponding `batchId` file. It will delete expired files as well if enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,29 @@ class CompactibleFileStreamLogSuite extends SparkFunSuite with SharedSQLContext
})
}

test("prevent removing metadata files via method purge") {
withFakeCompactibleFileStreamLog(
fileCleanupDelayMs = 10000,
defaultCompactInterval = 2,
defaultMinBatchesToRetain = 3,
compactibleLog => {
// compaction batches: 1
compactibleLog.add(0, Array("some_path_0"))
compactibleLog.add(1, Array("some_path_1"))
compactibleLog.add(2, Array("some_path_2"))

val exc = intercept[UnsupportedOperationException] {
compactibleLog.purge(2)
}
assert(exc.getMessage.contains("Cannot purge as it might break internal state"))

// Below line would fail with IllegalStateException if we don't prevent purge:
// - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch
// - allFiles() would read batch 1 (latest compaction) and 2 which batch 1 is deleted
compactibleLog.allFiles()
})
}

private def withFakeCompactibleFileStreamLog(
fileCleanupDelayMs: Long,
defaultCompactInterval: Int,
Expand Down