Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ object CatalystTypeConverters {
case other => other
}

/**
/**
* Converts Catalyst types used internally in rows to standard Scala types
* This method is slow, and for batch conversion you should be using converter
* produced by createToScalaConverter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ trait HiveTypeCoercion {
object InConversion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e
case e if !e.childrenResolved => e

case i @ In(a, b) if b.exists(_.dataType != a.dataType) =>
i.makeCopy(Array(a, b.map(Cast(_, a.dataType))))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ case object Descending extends SortDirection
* An expression that can be used to sort a tuple. This class extends expression primarily so that
* transformations over expression will descend into its child.
*/
case class SortOrder(child: Expression, direction: SortDirection) extends Expression
case class SortOrder(child: Expression, direction: SortDirection) extends Expression
with trees.UnaryNode[Expression] {

override def dataType: DataType = child.dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,13 @@ case class Sum(child: Expression) extends PartialAggregate with trees.UnaryNode[
* Combining PartitionLevel InputData
* <-- null
* Zero <-- Zero <-- null
*
*
* <-- null <-- no data
* null <-- null <-- no data
* null <-- null <-- no data
*/
case class CombineSum(child: Expression) extends AggregateExpression {
def this() = this(null)

override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = true
override def dataType: DataType = child.dataType
Expand Down Expand Up @@ -616,7 +616,7 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr

private val sum = MutableLiteral(null, calcType)

private val addFunction =
private val addFunction =
Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero))

override def update(input: Row): Unit = {
Expand All @@ -634,7 +634,7 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr

case class CombineSumFunction(expr: Expression, base: AggregateExpression)
extends AggregateFunction {

def this() = this(null, null) // Required for serialization.

private val calcType =
Expand All @@ -649,12 +649,12 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression)

private val sum = MutableLiteral(null, calcType)

private val addFunction =
private val addFunction =
Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero))

override def update(input: Row): Unit = {
val result = expr.eval(input)
// partial sum result can be null only when no input rows present
// partial sum result can be null only when no input rows present
if(result != null) {
sum.update(addFunction, input)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic
case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot
case other => sys.error(s"Type $other does not support numeric operations")
}

override def eval(input: Row): Any = {
val evalE2 = right.eval(input)
if (evalE2 == null || evalE2 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types._
case class CreateArray(children: Seq[Expression]) extends Expression {

override def foldable: Boolean = children.forall(_.foldable)

lazy val childTypes = children.map(_.dataType).distinct

override lazy val resolved =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.types._
* @param f The math function.
* @param name The short name of the function
*/
abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String)
extends BinaryExpression with Serializable with ExpectsInputTypes { self: Product =>

override def symbol: String = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.util.random.XORShiftRandom

/**
* A Random distribution generating expression.
* TODO: This can be made generic to generate any type of random distribution, or any type of
* TODO: This can be made generic to generate any type of random distribution, or any type of
* StructType.
*
* Since this expression is stateful, it cannot be a case object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ trait CaseConversionExpression extends ExpectsInputTypes {
* A function that converts the characters of a string to uppercase.
*/
case class Upper(child: Expression) extends UnaryExpression with CaseConversionExpression {

override def convert(v: UTF8String): UTF8String = v.toUpperCase()

override def toString: String = s"Upper($child)"
Expand All @@ -143,7 +143,7 @@ case class Upper(child: Expression) extends UnaryExpression with CaseConversionE
* A function that converts the characters of a string to lowercase.
*/
case class Lower(child: Expression) extends UnaryExpression with CaseConversionExpression {

override def convert(v: UTF8String): UTF8String = v.toLowerCase()

override def toString: String = s"Lower($child)"
Expand Down Expand Up @@ -223,7 +223,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression)
@inline
def slicePos(startPos: Int, sliceLen: Int, length: () => Int): (Int, Int) = {
// Hive and SQL use one-based indexing for SUBSTR arguments but also accept zero and
// negative indices for start positions. If a start index i is greater than 0, it
// negative indices for start positions. If a start index i is greater than 0, it
// refers to element i-1 in the sequence. If a start index i is less than 0, it refers
// to the -ith element before the end of the sequence. If a start index i is 0, it
// refers to the first element.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ object StructType {
case _ =>
throw new SparkException(s"Failed to merge incompatible data types $left and $right")
}

private[sql] def fieldsMap(fields: Array[StructField]): Map[String, StructField] = {
import scala.collection.breakOut
fields.map(s => (s.name, s))(breakOut)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1209,15 +1209,15 @@ class ExpressionEvaluationSuite extends ExpressionEvaluationBaseSuite {
}

/**
* Used for testing math functions for DataFrames.
* Used for testing math functions for DataFrames.
* @param c The DataFrame function
* @param f The functions in scala.math
* @param domain The set of values to run the function with
* @param expectNull Whether the given values should return null or not
* @tparam T Generic type for primitives
*/
def unaryMathFunctionEvaluation[@specialized(Int, Double, Float, Long) T](
c: Expression => Expression,
c: Expression => Expression,
f: T => T,
domain: Iterable[T] = (-20 to 20).map(_ * 0.1),
expectNull: Boolean = false): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ class CombiningLimitsSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("limits: combines two limits after ColumnPruning") {
val originalQuery =
testRelation
.select('a)
.limit(2)
.select('a)
.limit(5)

val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer =
testRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class ConstantFoldingSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("Constant folding test: Fold In(v, list) into true or false") {
var originalQuery =
testRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("column pruning for Project(ne, Limit)") {
val originalQuery =
testRelation
Expand All @@ -109,7 +109,7 @@ class FilterPushdownSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

// After this line is unimplemented.
test("simple push down") {
val originalQuery =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class OptimizeInSuite extends PlanTest {

comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: In clause not optimized in case filter has attributes") {
val originalQuery =
testRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DataTypeSuite extends SparkFunSuite {

test("fieldsMap returns map of name to StructField") {
val struct = StructType(
StructField("a", LongType) ::
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val mapped = StructType.fieldsMap(struct.fields)
Expand All @@ -90,7 +90,7 @@ class DataTypeSuite extends SparkFunSuite {

val right = StructType(List())
val merged = left.merge(right)

assert(merged === left)
}

Expand Down Expand Up @@ -133,7 +133,7 @@ class DataTypeSuite extends SparkFunSuite {

val right = StructType(
StructField("b", LongType) :: Nil)

intercept[SparkException] {
left.merge(right)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class GroupedData protected[sql](
def mean(colNames: String*): DataFrame = {
aggregateNumericColumns(colNames : _*)(Average)
}

/**
* Compute the max value for each numeric columns for each group.
* The resulting [[DataFrame]] will also contain the grouping columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[r] object SQLUtils {

dfCols.map { col =>
colToRBytes(col)
}
}
}

def convertRowsToColumns(localDF: Array[Row], numCols: Int): Array[Array[Any]] = {
Expand All @@ -121,7 +121,7 @@ private[r] object SQLUtils {
val numRows = col.length
val bos = new ByteArrayOutputStream()
val dos = new DataOutputStream(bos)

SerDe.writeInt(dos, numRows)

col.map { item =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ case class GeneratedAggregate(
case UnscaledValue(e) => e
case _ => expr
}
// partial sum result can be null only when no input rows present
// partial sum result can be null only when no input rows present
val updateFunction = If(
IsNotNull(actualExpr),
Coalesce(
Add(
Coalesce(currentSum :: zero :: Nil),
Coalesce(currentSum :: zero :: Nil),
Cast(expr, calcType)) :: currentSum :: zero :: Nil),
currentSum)

val result =
expr.dataType match {
case DecimalType.Fixed(_, _) =>
Expand All @@ -155,7 +155,7 @@ case class GeneratedAggregate(
}

AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result)

case m @ Max(expr) =>
val currentMax = AttributeReference("currentMax", expr.dataType, nullable = true)()
val initialValue = Literal.create(null, expr.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
* :: DeveloperApi ::
* Sample the dataset.
* @param lowerBound Lower-bound of the sampling probability (usually 0.0)
* @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
* @param upperBound Upper-bound of the sampling probability. The expected fraction sampled
* will be ub - lb.
* @param withReplacement Whether to sample with replacement.
* @param seed the random seed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[sql] object FrequentItems extends Logging {
}

/**
* Finding frequent items for columns, possibly with false positives. Using the
* Finding frequent items for columns, possibly with false positives. Using the
* frequent element count algorithm described in
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
* The `support` should be greater than 1e-4.
Expand All @@ -75,7 +75,7 @@ private[sql] object FrequentItems extends Logging {
* @return A Local DataFrame with the Array of frequent items for each column.
*/
private[sql] def singlePassFreqItems(
df: DataFrame,
df: DataFrame,
cols: Seq[String],
support: Double): DataFrame = {
require(support >= 1e-4, s"support ($support) must be greater than 1e-4.")
Expand All @@ -88,7 +88,7 @@ private[sql] object FrequentItems extends Logging {
val index = originalSchema.fieldIndex(name)
(name, originalSchema.fields(index).dataType)
}

val freqItems = df.select(cols.map(Column(_)) : _*).rdd.aggregate(countMaps)(
seqOp = (counts, row) => {
var i = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

private[sql] object StatFunctions extends Logging {

/** Calculate the Pearson Correlation Coefficient for the given columns */
private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = {
val counts = collectStatisticalData(df, cols)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,7 +1299,7 @@ object functions {
* @since 1.4.0
*/
def toRadians(columnName: String): Column = toRadians(Column(columnName))


//////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private[sql] class JDBCRDD(

// Each JDBC-to-Catalyst conversion corresponds to a tag defined here so that
// we don't have to potentially poke around in the Metadata once for every
// row.
// row.
// Is there a better way to do this? I'd rather be using a type that
// contains only the tags I define.
abstract class JDBCConversion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[sql] object JDBCRelation {
if (numPartitions == 1) return Array[Partition](JDBCPartition(null, 0))
// Overflow and silliness can happen if you subtract then divide.
// Here we get a little roundoff, but that's (hopefully) OK.
val stride: Long = (partitioning.upperBound / numPartitions
val stride: Long = (partitioning.upperBound / numPartitions
- partitioning.lowerBound / numPartitions)
var i: Int = 0
var currentValue: Long = partitioning.lowerBound
Expand Down Expand Up @@ -140,10 +140,10 @@ private[sql] case class JDBCRelation(
filters,
parts)
}

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
}
}
}
Loading