Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -566,6 +566,7 @@ object DataSource extends Logging {
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -587,7 +588,8 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
Expand All @@ -40,8 +40,8 @@ class RateStreamContinuousReader(options: DataSourceOptions)

val creationTime = System.currentTimeMillis()

val numPartitions = options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
Expand All @@ -57,12 +57,12 @@ class RateStreamContinuousReader(options: DataSourceOptions)
RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
}

override def readSchema(): StructType = RateSourceProvider.SCHEMA
override def readSchema(): StructType = RateStreamProvider.SCHEMA

private var offset: Offset = _

override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
this.offset = offset.orElse(createInitialOffset(numPartitions, creationTime))
}

override def getStartOffset(): Offset = offset
Expand Down Expand Up @@ -98,6 +98,19 @@ class RateStreamContinuousReader(options: DataSourceOptions)
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}

private def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = {
RateStreamOffset(
Range(0, numPartitions).map { i =>
// Note that the starting offset is exclusive, so we have to decrement the starting value
// by the increment that will later be applied. The first row output in each
// partition will have a value equal to the partition index.
(i,
ValueRunTimeMsPair(
(i - numPartitions).toLong,
creationTimeMs))
}.toMap)
}

}

case class RateStreamContinuousDataReaderFactory(
Expand Down
Loading