From 43cc1cec08b649e3d748f4a4f470fca834b66bdb Mon Sep 17 00:00:00 2001 From: emres Date: Thu, 9 Apr 2015 11:58:05 +0200 Subject: [PATCH 1/7] SPARK-3276 Added a new configuration parameter spark.streaming.minRemember duration, with a default value of 1 minute. --- .../spark/streaming/dstream/FileInputDStream.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 66d519171fd7..eceae2ed60ee 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.spark.SerializableWritable +import org.apache.spark.{SparkConf, SerializableWritable} import org.apache.spark.rdd.{RDD, UnionRDD} import org.apache.spark.streaming._ import org.apache.spark.util.{TimeStampedHashMap, Utils} @@ -331,11 +331,13 @@ private[streaming] object FileInputDStream { /** - * Minimum duration of remembering the information of selected files. Files with mod times - * older than this "window" of remembering will be ignored. So if new files are visible - * within this window, then the file will get selected in the next batch. + * Minimum duration of remembering the information of selected files. Defaults to 1 minute. + * + * Files with mod times older than this "window" of remembering will be ignored. So if new + * files are visible within this window, then the file will get selected in the next batch. */ - private val MIN_REMEMBER_DURATION = Minutes(1) + private val minRememberDuration = new SparkConf().get("spark.streaming.minRememberDuration", "1") + private val MIN_REMEMBER_DURATION = Minutes(minRememberDuration.toLong) def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") From daccc827a79862f47aaf4d5083754fa47033caab Mon Sep 17 00:00:00 2001 From: emres Date: Thu, 9 Apr 2015 13:41:57 +0200 Subject: [PATCH 2/7] SPARK-3276 Changed the property name to reflect the unit of value and reduced number of fields. * Changed the property name to spark.streaming.minRememberDurationMin to reflect the unit of value (minutes). * Deleted the constant MIN_REMEMBER_DURATION, because now minRememberDurationMin is serving the same purpose. --- .../spark/streaming/dstream/FileInputDStream.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index eceae2ed60ee..61570339b1a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * the streaming app. * - If a file is to be visible in the directory listings, it must be visible within a certain * duration of the mod time of the file. This duration is the "remember window", which is set to - * 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be + * 1 minute (see `FileInputDStream.minRememberDurationMin`). Otherwise, the file will never be * selected as the mod time will be less than the ignore threshold when it becomes visible. * - Once a file is visible, the mod time cannot change. If it does due to appends, then the * processing semantics are undefined. @@ -336,16 +336,17 @@ object FileInputDStream { * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDuration = new SparkConf().get("spark.streaming.minRememberDuration", "1") - private val MIN_REMEMBER_DURATION = Minutes(minRememberDuration.toLong) + private val minRememberDurationMin = Minutes(new SparkConf() + .get("spark.streaming.minRememberDurationMin", "1") + .toLong) def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") /** * Calculate the number of last batches to remember, such that all the files selected in - * at least last MIN_REMEMBER_DURATION duration can be remembered. + * at least last minRememberDurationMin duration can be remembered. */ def calculateNumBatchesToRemember(batchDuration: Duration): Int = { - math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt + math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt } } From bfe0acbb24029d70a184a6d6443bfdbf6c087114 Mon Sep 17 00:00:00 2001 From: emres Date: Mon, 13 Apr 2015 11:29:44 +0200 Subject: [PATCH 3/7] SPARK-3276 Moved the minRememberDurationMin to the class * Moved the minRememberDurationMin to the class so that it can use the existing Spark context * Refactored calculateNumBatchesToRemember to take minRememberDurationMin as a parameter --- .../streaming/dstream/FileInputDStream.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 61570339b1a9..948d16a6e702 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -80,6 +80,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private val serializableConfOpt = conf.map(new SerializableWritable(_)) + /** + * Minimum duration of remembering the information of selected files. Defaults to 1 minute. + * + * Files with mod times older than this "window" of remembering will be ignored. So if new + * files are visible within this window, then the file will get selected in the next batch. + */ + private val minRememberDurationMin = Minutes(ssc.sparkContext.getConf + .get("spark.streaming.minRememberDurationMin", "1") + .toLong) + // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -95,7 +105,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * This would allow us to filter away not-too-old files which have already been recently * selected and processed. */ - private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration) + private val numBatchesToRemember = FileInputDStream + .calculateNumBatchesToRemember(slideDuration, minRememberDurationMin) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -330,23 +341,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private[streaming] object FileInputDStream { - /** - * Minimum duration of remembering the information of selected files. Defaults to 1 minute. - * - * Files with mod times older than this "window" of remembering will be ignored. So if new - * files are visible within this window, then the file will get selected in the next batch. - */ - private val minRememberDurationMin = Minutes(new SparkConf() - .get("spark.streaming.minRememberDurationMin", "1") - .toLong) - def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") /** * Calculate the number of last batches to remember, such that all the files selected in * at least last minRememberDurationMin duration can be remembered. */ - def calculateNumBatchesToRemember(batchDuration: Duration): Int = { + def calculateNumBatchesToRemember(batchDuration: Duration, + minRememberDurationMin: Duration): Int = { math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt } } From 1c53ba9ed84b2e2a5129f4cedd0574060704c9d7 Mon Sep 17 00:00:00 2001 From: emres Date: Mon, 13 Apr 2015 12:00:52 +0200 Subject: [PATCH 4/7] SPARK-3276 Started to use ssc.conf rather than ssc.sparkContext.getConf, and also getLong method directly. --- .../apache/spark/streaming/dstream/FileInputDStream.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 948d16a6e702..e9fb3ec163ec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -86,9 +86,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDurationMin = Minutes(ssc.sparkContext.getConf - .get("spark.streaming.minRememberDurationMin", "1") - .toLong) + private val minRememberDurationMin = Minutes(ssc.conf + .getLong("spark.streaming.minRememberDurationMin", 1L)) // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock From c9d58ca361cf7f9fc3253fc27486b65da1673c96 Mon Sep 17 00:00:00 2001 From: emres Date: Mon, 13 Apr 2015 14:19:24 +0200 Subject: [PATCH 5/7] SPARK-3276 Minor code re-formatting. --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index e9fb3ec163ec..1e355b394aab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -86,8 +86,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDurationMin = Minutes(ssc.conf - .getLong("spark.streaming.minRememberDurationMin", 1L)) + private val minRememberDurationMin = + Minutes(ssc.conf.getLong("spark.streaming.minRememberDurationMin", 1L)) // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock From affee1d4e31fdaed250ad2133780bf7af65a83f8 Mon Sep 17 00:00:00 2001 From: emres Date: Mon, 13 Apr 2015 14:52:11 +0200 Subject: [PATCH 6/7] SPARK-3276 Changed the property name and variable name for minRememberDuration * switched back to using spark.streaming.minRememberDuration * renamed minRememberDurationMin to minRememberDuration --- .../streaming/dstream/FileInputDStream.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1e355b394aab..1b093b12f627 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils} * the streaming app. * - If a file is to be visible in the directory listings, it must be visible within a certain * duration of the mod time of the file. This duration is the "remember window", which is set to - * 1 minute (see `FileInputDStream.minRememberDurationMin`). Otherwise, the file will never be + * 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be * selected as the mod time will be less than the ignore threshold when it becomes visible. * - Once a file is visible, the mod time cannot change. If it does due to appends, then the * processing semantics are undefined. @@ -81,13 +81,13 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( private val serializableConfOpt = conf.map(new SerializableWritable(_)) /** - * Minimum duration of remembering the information of selected files. Defaults to 1 minute. + * Minimum duration of remembering the information of selected files. Defaults to 60 seconds. * * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDurationMin = - Minutes(ssc.conf.getLong("spark.streaming.minRememberDurationMin", 1L)) + private val minRememberDuration = + Seconds(ssc.conf.getLong("spark.streaming.minRememberDuration", 60L)) // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -105,7 +105,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * selected and processed. */ private val numBatchesToRemember = FileInputDStream - .calculateNumBatchesToRemember(slideDuration, minRememberDurationMin) + .calculateNumBatchesToRemember(slideDuration, minRememberDuration) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -344,10 +344,10 @@ object FileInputDStream { /** * Calculate the number of last batches to remember, such that all the files selected in - * at least last minRememberDurationMin duration can be remembered. + * at least last minRememberDuration duration can be remembered. */ def calculateNumBatchesToRemember(batchDuration: Duration, - minRememberDurationMin: Duration): Int = { - math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt + minRememberDuration: Duration): Int = { + math.ceil(minRememberDuration.milliseconds.toDouble / batchDuration.milliseconds).toInt } } From 766f9386acbc82de52106b1e3025df5729d15a46 Mon Sep 17 00:00:00 2001 From: emres Date: Tue, 14 Apr 2015 17:32:18 +0200 Subject: [PATCH 7/7] SPARK-3276 Switched to using newly added getTimeAsSeconds method. * Switched to using newly added getTimeAsSeconds method (see https://github.com/apache/spark/pull/5236) * Renamed minRememberDuration to minRememberDurationS to be compatible with the examples in the pull request above. --- .../spark/streaming/dstream/FileInputDStream.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 1b093b12f627..eca69f00188e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -86,8 +86,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * Files with mod times older than this "window" of remembering will be ignored. So if new * files are visible within this window, then the file will get selected in the next batch. */ - private val minRememberDuration = - Seconds(ssc.conf.getLong("spark.streaming.minRememberDuration", 60L)) + private val minRememberDurationS = + Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s")) // This is a def so that it works during checkpoint recovery: private def clock = ssc.scheduler.clock @@ -105,7 +105,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]]( * selected and processed. */ private val numBatchesToRemember = FileInputDStream - .calculateNumBatchesToRemember(slideDuration, minRememberDuration) + .calculateNumBatchesToRemember(slideDuration, minRememberDurationS) private val durationToRemember = slideDuration * numBatchesToRemember remember(durationToRemember) @@ -344,10 +344,10 @@ object FileInputDStream { /** * Calculate the number of last batches to remember, such that all the files selected in - * at least last minRememberDuration duration can be remembered. + * at least last minRememberDurationS duration can be remembered. */ def calculateNumBatchesToRemember(batchDuration: Duration, - minRememberDuration: Duration): Int = { - math.ceil(minRememberDuration.milliseconds.toDouble / batchDuration.milliseconds).toInt + minRememberDurationS: Duration): Int = { + math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt } }