Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address more comments
  • Loading branch information
zsxwing committed Apr 19, 2016
commit e8c14d60deb1c068f770d7ff3fc9bef000aff899
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
/**
* If we delete the old files after compaction at once, there is a race condition in S3: other
* processes may see the old files are deleted but still cannot see the compaction file. The user
* should set a reasonable `fileExpiredTimeMS`. We will wait until then so that the compaction
* should set a reasonable `fileCleanupDelayMs`. We will wait until then so that the compaction
* file is guaranteed to be visible for all readers
*/
private val fileExpiredTimeMs = sqlContext.getConf(SQLConf.FILE_STREAM_SINK_LOG_EXPIRED_TIME)
private val fileCleanupDelayMs = sqlContext.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all AWS S3 endpoints now implement create consistency: if a new object is created, then a GET made directly on it will return that object.

what can take time to appear is the aggregate file in an ls of the parent "directory" —that's really a wild card match on the path. If the processes can determine the final name of the compaction file, they can look for that file directly (getFileStatus() should suffice, open() even better). If the compact file isn't found, they can look for the non-aggregate files. All that should be required is the aggregate file fully written (with a close() at the end of output operation which doesn't discard any raised exception), before deleting the original files. Adding a minor delay is a low-harm feature, but having a direct check for the aggregate file is something which should be done first

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran thanks for pointing out it. I updated the codes. Now it will try to access the next compaction/aggregate file directly. However, a cleanup delay is still helpful to avoid a live lock.


private val isDeletingExpiredLog = sqlContext.getConf(SQLConf.FILE_STREAM_SINK_LOG_DELETE)
private val isDeletingExpiredLog = sqlContext.getConf(SQLConf.FILE_SINK_LOG_DELETION)

private val compactLength = sqlContext.getConf(SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN)
require(compactLength > 0,
s"Please set ${SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key} (was $compactLength) " +
private val compactInterval = sqlContext.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")

override def batchIdToPath(batchId: Long): Path = {
if (isCompactionBatch(batchId, compactLength)) {
if (isCompactionBatch(batchId, compactInterval)) {
new Path(metadataPath, s"$batchId$COMPACT_FILE_SUFFIX")
} else {
new Path(metadataPath, batchId.toString)
Expand Down Expand Up @@ -110,7 +110,7 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
}

override def add(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
if (isCompactionBatch(batchId, compactLength)) {
if (isCompactionBatch(batchId, compactInterval)) {
compact(batchId, logs)
} else {
super.add(batchId, logs)
Expand All @@ -127,7 +127,7 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
// race condition.
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you explain the loop?

if (latestId >= 0) {
val startId = getAllValidBatches(latestId, compactLength)(0)
val startId = getAllValidBatches(latestId, compactInterval)(0)
try {
val logs = get(Some(startId), Some(latestId)).flatMap(_._2)
return compactLogs(logs).toArray
Expand Down Expand Up @@ -155,7 +155,7 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
* corresponding `batchId` file. It will delete expired files as well if enabled.
*/
private def compact(batchId: Long, logs: Seq[SinkFileStatus]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactLength)
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.flatMap(batchId => get(batchId)).flatten ++ logs
if (super.add(batchId, compactLogs(allLogs))) {
if (isDeletingExpiredLog) {
Expand All @@ -172,10 +172,10 @@ class FileStreamSinkLog(sqlContext: SQLContext, path: String)
* Since all logs before `compactionBatchId` are compacted and written into the
* `compactionBatchId` log file, they can be removed. However, due to the eventual consistency of
* S3, the compaction file may not be seen by other processes at once. So we only delete files
* created `fileExpiredTimeMs` milliseconds ago.
* created `fileCleanupDelayMs` milliseconds ago.
*/
private def deleteExpiredLog(compactionBatchId: Long): Unit = {
val expiredTime = System.currentTimeMillis() - fileExpiredTimeMs
val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
fileManager.list(metadataPath, new PathFilter {
override def accept(path: Path): Boolean = {
try {
Expand Down Expand Up @@ -206,37 +206,37 @@ object FileStreamSinkLog {

/**
* Returns if this is a compaction batch. FileStreamSinkLog will compact old logs every
* `compactLength` commits.
* `compactInterval` commits.
*
* E.g., if `compactLength` is 3, then 2, 5, 8, ... are all compaction batches.
* E.g., if `compactInterval` is 3, then 2, 5, 8, ... are all compaction batches.
*/
def isCompactionBatch(batchId: Long, compactLength: Int): Boolean = {
(batchId + 1) % compactLength == 0
def isCompactionBatch(batchId: Long, compactInterval: Int): Boolean = {
(batchId + 1) % compactInterval == 0
}

/**
* Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
* need to do a new compaction.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you put an example like the isCompletionBatch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

*
* E.g., if `compactLength` is 3 and `compactionBatchId` is 5, this method should returns
* E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
* `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
*/
def getValidBatchesBeforeCompactionBatch(
compactionBatchId: Long,
compactLength: Int): Seq[Long] = {
assert(isCompactionBatch(compactionBatchId, compactLength),
compactInterval: Int): Seq[Long] = {
assert(isCompactionBatch(compactionBatchId, compactInterval),
s"$compactionBatchId is not a compaction batch")
(math.max(0, compactionBatchId - compactLength)) until compactionBatchId
(math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
}

/**
* Returns all necessary logs before `batchId` (inclusive). If `batchId` is a compaction, just
* return itself. Otherwise, it will find the previous compaction batch and return all batches
* between it and `batchId`.
*/
def getAllValidBatches(batchId: Long, compactLength: Long): Seq[Long] = {
def getAllValidBatches(batchId: Long, compactInterval: Long): Seq[Long] = {
assert(batchId >= 0)
val start = math.max(0, (batchId + 1) / compactLength * compactLength - 1)
val start = math.max(0, (batchId + 1) / compactInterval * compactInterval - 1)
start to batchId
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.internal

import java.util.{NoSuchElementException, Properties}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.collection.immutable
Expand Down Expand Up @@ -443,24 +444,25 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val FILE_STREAM_SINK_LOG_DELETE = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream sink.")
.booleanConf
.createWithDefault(true)

val FILE_STREAM_SINK_LOG_COMPACT_LEN =
SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactLen")
val FILE_SINK_LOG_COMPACT_INTERVAL =
SQLConfigBuilder("spark.sql.streaming.fileSink.log.compactInterval")
.internal()
.doc("Every how many log files is a compaction triggered.")
.doc("Number of log files after which all the previous files " +
"are compacted into the next log file.")
.intConf
.createWithDefault(10)

val FILE_STREAM_SINK_LOG_EXPIRED_TIME =
SQLConfigBuilder("spark.sql.streaming.fileSink.log.expired")
val FILE_SINK_LOG_CLEANUP_DELAY =
SQLConfigBuilder("spark.sql.streaming.fileSink.log.cleanupDelay")
.internal()
.doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? I thought the plan was to use optimistic concurrency control (i.e. just retry if there is a FileNotFoundException).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? I thought the plan was to use optimistic concurrency control (i.e. just retry if there is a FileNotFoundException).

See my comments here: https://github.com/apache/spark/pull/12435/files#diff-e529f046ee04b9926e8dd88e131134e5R61

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore S3; look at S3N in Hadoop 2.4. Sadly, it doesn't either; I didn't fix that till 2.5 & HADOOP-9361/HADOOP-9597. Hadoop 2.4 s3n is broken in other ways; look at HADOOP-10457.

to summarise: Don't use s3n in Hadoop 2.4; it was the first update to a later Jets3t library and under tested. 2.5 fixed it, 2.6.0 added s3a, though that's not ready for use in 2.7.

Best to do a check for existence up front (getFileStatus()), which works everywhere.

.longConf
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(3600 * 1000L) // 1 hour

object Deprecated {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,41 +37,41 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}

test("isCompactionBatch") {
assert(false === isCompactionBatch(0, compactLength = 3))
assert(false === isCompactionBatch(1, compactLength = 3))
assert(true === isCompactionBatch(2, compactLength = 3))
assert(false === isCompactionBatch(3, compactLength = 3))
assert(false === isCompactionBatch(4, compactLength = 3))
assert(true === isCompactionBatch(5, compactLength = 3))
assert(false === isCompactionBatch(0, compactInterval = 3))
assert(false === isCompactionBatch(1, compactInterval = 3))
assert(true === isCompactionBatch(2, compactInterval = 3))
assert(false === isCompactionBatch(3, compactInterval = 3))
assert(false === isCompactionBatch(4, compactInterval = 3))
assert(true === isCompactionBatch(5, compactInterval = 3))
}

test("getValidBatchesBeforeCompactionBatch") {
intercept[AssertionError] {
getValidBatchesBeforeCompactionBatch(0, compactLength = 3)
getValidBatchesBeforeCompactionBatch(0, compactInterval = 3)
}
intercept[AssertionError] {
getValidBatchesBeforeCompactionBatch(1, compactLength = 3)
getValidBatchesBeforeCompactionBatch(1, compactInterval = 3)
}
assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactLength = 3))
assert(Seq(0, 1) === getValidBatchesBeforeCompactionBatch(2, compactInterval = 3))
intercept[AssertionError] {
getValidBatchesBeforeCompactionBatch(3, compactLength = 3)
getValidBatchesBeforeCompactionBatch(3, compactInterval = 3)
}
intercept[AssertionError] {
getValidBatchesBeforeCompactionBatch(4, compactLength = 3)
getValidBatchesBeforeCompactionBatch(4, compactInterval = 3)
}
assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactLength = 3))
assert(Seq(2, 3, 4) === getValidBatchesBeforeCompactionBatch(5, compactInterval = 3))
}

test("getAllValidBatches") {
assert(Seq(0) === getAllValidBatches(0, compactLength = 3))
assert(Seq(0, 1) === getAllValidBatches(1, compactLength = 3))
assert(Seq(2) === getAllValidBatches(2, compactLength = 3))
assert(Seq(2, 3) === getAllValidBatches(3, compactLength = 3))
assert(Seq(2, 3, 4) === getAllValidBatches(4, compactLength = 3))
assert(Seq(5) === getAllValidBatches(5, compactLength = 3))
assert(Seq(5, 6) === getAllValidBatches(6, compactLength = 3))
assert(Seq(5, 6, 7) === getAllValidBatches(7, compactLength = 3))
assert(Seq(8) === getAllValidBatches(8, compactLength = 3))
assert(Seq(0) === getAllValidBatches(0, compactInterval = 3))
assert(Seq(0, 1) === getAllValidBatches(1, compactInterval = 3))
assert(Seq(2) === getAllValidBatches(2, compactInterval = 3))
assert(Seq(2, 3) === getAllValidBatches(3, compactInterval = 3))
assert(Seq(2, 3, 4) === getAllValidBatches(4, compactInterval = 3))
assert(Seq(5) === getAllValidBatches(5, compactInterval = 3))
assert(Seq(5, 6) === getAllValidBatches(6, compactInterval = 3))
assert(Seq(5, 6, 7) === getAllValidBatches(7, compactInterval = 3))
assert(Seq(8) === getAllValidBatches(8, compactInterval = 3))
}

test("compactLogs") {
Expand Down Expand Up @@ -124,7 +124,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}

test("batchIdToPath") {
withSQLConf(SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key -> "3") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
assert("0" === sinkLog.batchIdToPath(0).getName)
assert("1" === sinkLog.batchIdToPath(1).getName)
Expand All @@ -137,32 +137,31 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
}

test("compact") {
withSQLConf(SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key -> "3") {
withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
withFileStreamSinkLog { sinkLog =>
for (batchId <- 0 to 10) {
sinkLog.add(
batchId,
Seq(SinkFileStatus("/a/b/" + batchId, 100L, FileStreamSinkLog.ADD_ACTION)))
assert(sinkLog.allFiles() === (0 to batchId).map {
val expectedFiles = (0 to batchId).map {
id => SinkFileStatus("/a/b/" + id, 100L, FileStreamSinkLog.ADD_ACTION)
})
}
assert(sinkLog.allFiles() === expectedFiles)
if (isCompactionBatch(batchId, 3)) {
// Since batchId is a compaction batch, the batch log file should contain all logs
assert(sinkLog.get(batchId).getOrElse(Nil) === (0 to batchId).map {
id => SinkFileStatus("/a/b/" + id, 100L, FileStreamSinkLog.ADD_ACTION)
})
assert(sinkLog.get(batchId).getOrElse(Nil) === expectedFiles)
}
}
}
}
}

test("delete expired file") {
// Set FILE_STREAM_SINK_LOG_EXPIRED_TIME to 0 so that we can detect the deleting behaviour
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically
withSQLConf(
SQLConf.FILE_STREAM_SINK_LOG_COMPACT_LEN.key -> "3",
SQLConf.FILE_STREAM_SINK_LOG_EXPIRED_TIME.key -> "0") {
SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
withFileStreamSinkLog { sinkLog =>
val metadataPath = new File(sinkLog.metadataPath.toUri.toString)

Expand Down