From 41148c605ddf48c155fc03611bca03af9d4e25a3 Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Tue, 30 Jan 2018 19:30:49 +0800 Subject: [PATCH 1/2] [SPARK-23270][Streaming][WEB-UI]FileInputDStream Streaming UI 's records should not be set to the default value of 0, it should be the total number of rows of new files. --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b8a5a96faf15..a9eb51e1fc6f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -157,7 +157,9 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( val metadata = Map( "files" -> newFiles.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n")) - val inputInfo = StreamInputInfo(id, 0, metadata) + var numRecords = 0L + rdds.foreach(rdd => numRecords = numRecords + rdd.count) + val inputInfo = StreamInputInfo(id, numRecords, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) rdds } From 742034e5d2ad6996dfbd8ee54019650dbe3825ad Mon Sep 17 00:00:00 2001 From: guoxiaolong Date: Wed, 31 Jan 2018 10:20:40 +0800 Subject: [PATCH 2/2] fix the code --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index a9eb51e1fc6f..a5201ad78de3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -157,9 +157,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( val metadata = Map( "files" -> newFiles.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n")) - var numRecords = 0L - rdds.foreach(rdd => numRecords = numRecords + rdd.count) - val inputInfo = StreamInputInfo(id, numRecords, metadata) + val inputInfo = StreamInputInfo(id, rdds.map(_.count).sum, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) rdds }