Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix columnar tests
  • Loading branch information
nikolamand-db committed Jun 20, 2024
commit 59993b0fb0236e510998462137d511c74913b318
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[sql] object ColumnAccessor {
new LongColumnAccessor(buf)
case FloatType => new FloatColumnAccessor(buf)
case DoubleType => new DoubleColumnAccessor(buf)
case dt: StringType => new StringColumnAccessor(buf, dt)
case s: StringType => new StringColumnAccessor(buf, s)
case BinaryType => new BinaryColumnAccessor(buf)
case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS =>
new CompactDecimalColumnAccessor(buf, dt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.mutable
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.types.{PhysicalBooleanType, PhysicalByteType, PhysicalDataType, PhysicalDoubleType, PhysicalFloatType, PhysicalIntegerType, PhysicalLongType, PhysicalShortType, PhysicalStringType}
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
Expand Down Expand Up @@ -177,8 +176,7 @@ private[columnar] case object RunLengthEncoding extends CompressionScheme {
}

override def supports(columnType: ColumnType[_]): Boolean = columnType match {
case INT | LONG | SHORT | BYTE | BOOLEAN => true
case STRING(CollationFactory.UTF8_BINARY_COLLATION_ID) => true
case INT | LONG | SHORT | BYTE | _: STRING | BOOLEAN => true
case _ => false
}

Expand Down Expand Up @@ -375,7 +373,7 @@ private[columnar] case object DictionaryEncoding extends CompressionScheme {
}

override def supports(columnType: ColumnType[_]): Boolean = columnType match {
case INT | LONG | STRING(CollationFactory.UTF8_BINARY_COLLATION_ID) => true
case INT | LONG | _: STRING => true
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class ColumnStatsSuite extends SparkFunSuite {
testColumnStats(classOf[LongColumnStats], LONG, Array(Long.MaxValue, Long.MinValue, 0))
testColumnStats(classOf[FloatColumnStats], FLOAT, Array(Float.MaxValue, Float.MinValue, 0))
testColumnStats(classOf[DoubleColumnStats], DOUBLE, Array(Double.MaxValue, Double.MinValue, 0))
testColumnStats(classOf[StringColumnStats], STRING(StringType), Array(null, null, 0))
testDecimalColumnStats(Array(null, null, 0))
testIntervalColumnStats(Array(null, null, 0))
testStringColumnStats(Array(null, null, 0))

def testColumnStats[T <: PhysicalDataType, U <: ColumnStats](
columnStatsClass: Class[U],
Expand Down Expand Up @@ -142,4 +142,60 @@ class ColumnStatsSuite extends SparkFunSuite {
}
}
}

def testStringColumnStats[T <: PhysicalDataType, U <: ColumnStats](
initialStatistics: Array[Any]): Unit = {

Seq("UTF8_BINARY", "UTF8_LCASE", "UNICODE", "UNICODE_CI").foreach(collation => {
val columnType = STRING(StringType(collation))

test(s"STRING($collation): empty") {
val columnStats = new StringColumnStats(StringType(collation).collationId)
columnStats.collectedStatistics.zip(initialStatistics).foreach {
case (actual, expected) => assert(actual === expected)
}
}

test(s"STRING($collation): non-empty") {
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._

val columnStats = new StringColumnStats(StringType(collation).collationId)
val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1))
rows.foreach(columnStats.gatherStats(_, 0))

val values = rows.take(10).map(_.get(0,
ColumnarDataTypeUtils.toLogicalDataType(columnType.dataType)))
val ordering = PhysicalDataType.ordering(
ColumnarDataTypeUtils.toLogicalDataType(columnType.dataType))
val stats = columnStats.collectedStatistics

assertResult(values.min(ordering), "Wrong lower bound")(stats(0))
assertResult(values.max(ordering), "Wrong upper bound")(stats(1))
assertResult(10, "Wrong null count")(stats(2))
assertResult(20, "Wrong row count")(stats(3))
assertResult(stats(4), "Wrong size in bytes") {
rows.map { row =>
if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0)
}.sum
}
}
})

test("STRING(UTF8_LCASE): collation-defined ordering") {
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.unsafe.types.UTF8String

val columnStats = new StringColumnStats(StringType("UTF8_LCASE").collationId)
val rows = Seq("a", "A").map(str => {
val row = new GenericInternalRow(1)
row(0) = UTF8String.fromString(str)
row
})
rows.foreach(columnStats.gatherStats(_, 0))

val stats = columnStats.collectedStatistics
assertResult(UTF8String.fromString("a"), "Wrong lower bound")(stats(0))
assertResult(UTF8String.fromString("a"), "Wrong upper bound")(stats(1))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.types.{PhysicalArrayType, PhysicalDataType, PhysicalMapType, PhysicalStructType}
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand All @@ -41,7 +42,7 @@ class ColumnTypeSuite extends SparkFunSuite {
NULL -> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8,
FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 10) -> 12,
STRING(StringType) -> 8, STRING(StringType("UTF8_LCASE")) -> 8,
STRING(StringType("UNICODE")) -> 8, STRING(StringType("UNICODE_CO")) -> 8,
STRING(StringType("UNICODE")) -> 8, STRING(StringType("UNICODE_CI")) -> 8,
BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 28, MAP_TYPE -> 68,
CALENDAR_INTERVAL -> 16)

Expand Down Expand Up @@ -114,20 +115,28 @@ class ColumnTypeSuite extends SparkFunSuite {
testColumnType(CALENDAR_INTERVAL)

def testNativeColumnType[T <: PhysicalDataType](columnType: NativeColumnType[T]): Unit = {
testColumnType[T#InternalType](columnType)
val typeName = columnType match {
case s: STRING =>
val collation = CollationFactory.fetchCollation(s.collationId).collationName
Some(if (collation == "UTF8_BINARY") "STRING" else s"STRING($collation)")
case _ => None
}
testColumnType[T#InternalType](columnType, typeName)
}

def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = {

def testColumnType[JvmType](
columnType: ColumnType[JvmType],
typeName: Option[String] = None): Unit = {
val proj = UnsafeProjection.create(
Array[DataType](ColumnarDataTypeUtils.toLogicalDataType(columnType.dataType)))
val converter = CatalystTypeConverters.createToScalaConverter(
ColumnarDataTypeUtils.toLogicalDataType(columnType.dataType))
val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy())
val totalSize = seq.map(_.getSizeInBytes).sum
val bufferSize = Math.max(DEFAULT_BUFFER_SIZE, totalSize)
val testName = typeName.getOrElse(columnType.toString)

test(s"$columnType append/extract") {
test(s"$testName append/extract") {
val buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.nativeOrder())
seq.foreach(r => columnType.append(columnType.getField(r, 0), buffer))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.types.{PhysicalArrayType, PhysicalMapType, PhysicalStructType}
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.types._

class TestNullableColumnAccessor[JvmType](
Expand All @@ -41,22 +42,33 @@ object TestNullableColumnAccessor {
class NullableColumnAccessorSuite extends SparkFunSuite {
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._

Seq(
val stringTypes = Seq(
STRING(StringType),
STRING(StringType("UTF8_LCASE")),
STRING(StringType("UNICODE")),
STRING(StringType("UNICODE_CI")))
val otherTypes = Seq(
NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
STRING(StringType), STRING(StringType("UTF8_LCASE")), STRING(StringType("UNICODE")),
STRING(StringType("UNICODE_CI")), BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
STRUCT(PhysicalStructType(Array(StructField("a", StringType)))),
ARRAY(PhysicalArrayType(IntegerType, true)),
MAP(PhysicalMapType(IntegerType, StringType, true)),
CALENDAR_INTERVAL)
.foreach {

stringTypes.foreach(s => {
val collation = CollationFactory.fetchCollation(s.collationId).collationName
val typeName = if (collation == "UTF8_BINARY") "STRING" else s"STRING($collation)"
testNullableColumnAccessor(s, Some(typeName))
})
otherTypes.foreach {
testNullableColumnAccessor(_)
}

def testNullableColumnAccessor[JvmType](
columnType: ColumnType[JvmType]): Unit = {
columnType: ColumnType[JvmType],
testTypeName: Option[String] = None): Unit = {

val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
val typeName = testTypeName.getOrElse(columnType.getClass.getSimpleName.stripSuffix("$"))
val nullRow = makeNullRow(1)

test(s"Nullable $typeName column accessor: empty column") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.types.{PhysicalArrayType, PhysicalMapType, PhysicalStructType}
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.types._

class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType])
Expand All @@ -39,22 +40,33 @@ object TestNullableColumnBuilder {
class NullableColumnBuilderSuite extends SparkFunSuite {
import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._

Seq(
val stringTypes = Seq(
STRING(StringType),
STRING(StringType("UTF8_LCASE")),
STRING(StringType("UNICODE")),
STRING(StringType("UNICODE_CI")))
val otherTypes = Seq(
BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
STRING(StringType), STRING(StringType("UTF8_LCASE")), STRING(StringType("UNICODE")),
STRING(StringType("UNICODE_CI")), BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10),
STRUCT(PhysicalStructType(Array(StructField("a", StringType)))),
ARRAY(PhysicalArrayType(IntegerType, true)),
MAP(PhysicalMapType(IntegerType, StringType, true)),
CALENDAR_INTERVAL)
.foreach {

stringTypes.foreach(s => {
val collation = CollationFactory.fetchCollation(s.collationId).collationName
val typeName = if (collation == "UTF8_BINARY") "STRING" else s"STRING($collation)"
testNullableColumnBuilder(s, Some(typeName))
})
otherTypes.foreach {
testNullableColumnBuilder(_)
}

def testNullableColumnBuilder[JvmType](
columnType: ColumnType[JvmType]): Unit = {
columnType: ColumnType[JvmType],
testTypeName: Option[String] = None): Unit = {

val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
val typeName = testTypeName.getOrElse(columnType.getClass.getSimpleName.stripSuffix("$"))
val dataType = columnType.dataType
val proj = UnsafeProjection.create(Array[DataType](
ColumnarDataTypeUtils.toLogicalDataType(dataType)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ class DictionaryEncodingSuite extends SparkFunSuite {
"UTF8_BINARY", "UTF8_LCASE", "UNICODE", "UNICODE_CI"
).foreach(collation => {
val dt = StringType(collation)
testDictionaryEncoding(new StringColumnStats(dt), STRING(dt), false)
val typeName = if (collation == "UTF8_BINARY") "STRING" else s"STRING($collation)"
testDictionaryEncoding(new StringColumnStats(dt), STRING(dt), false, Some(typeName))
})

def testDictionaryEncoding[T <: PhysicalDataType](
columnStats: ColumnStats,
columnType: NativeColumnType[T],
testDecompress: Boolean = true): Unit = {
testDecompress: Boolean = true,
testTypeName: Option[String] = None): Unit = {

val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
val typeName = testTypeName.getOrElse(columnType.getClass.getSimpleName.stripSuffix("$"))

def buildDictionary(buffer: ByteBuffer) = {
(0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ class RunLengthEncodingSuite extends SparkFunSuite {
"UTF8_BINARY", "UTF8_LCASE", "UNICODE", "UNICODE_CI"
).foreach(collation => {
val dt = StringType(collation)
testRunLengthEncoding(new StringColumnStats(dt), STRING(dt), false)
val typeName = if (collation == "UTF8_BINARY") "STRING" else s"STRING($collation)"
testRunLengthEncoding(new StringColumnStats(dt), STRING(dt), false, Some(typeName))
})

def testRunLengthEncoding[T <: PhysicalDataType](
columnStats: ColumnStats,
columnType: NativeColumnType[T],
testDecompress: Boolean = true): Unit = {
testDecompress: Boolean = true,
testTypeName: Option[String] = None): Unit = {

val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
val typeName = testTypeName.getOrElse(columnType.getClass.getSimpleName.stripSuffix("$"))

def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]): Unit = {
// -------------
Expand Down