From 8a639bcc0ea70fd232a718e8ef164be20821b800 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 29 Jun 2016 01:10:49 -0700 Subject: [PATCH 1/4] [SPARK-16278][SPARK-16279][SQL] Implement map_keys/map_values SQL functions --- .../catalyst/analysis/FunctionRegistry.scala | 2 + .../expressions/collectionOperations.scala | 50 +++++++++++++++++++ .../CollectionFunctionsSuite.scala | 10 ++++ .../org/apache/spark/sql/functions.scala | 16 ++++++ .../spark/sql/DataFrameFunctionsSuite.scala | 24 +++++++++ .../spark/sql/hive/HiveSessionCatalog.scala | 1 - 6 files changed, 102 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 26b0c30db4e1..e7f335f4fb4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -171,6 +171,8 @@ object FunctionRegistry { expression[IsNotNull]("isnotnull"), expression[Least]("least"), expression[CreateMap]("map"), + expression[MapKeys]("map_keys"), + expression[MapValues]("map_values"), expression[CreateNamedStruct]("named_struct"), expression[NaNvl]("nanvl"), expression[NullIf]("nullif"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index c71cb73d65bf..eec8a7e3185d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -43,6 +43,56 @@ case class Size(child: Expression) extends UnaryExpression with ExpectsInputType } } +/** + * Returns an unordered array containing the keys of the map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array containing the keys of the map.") +case class MapKeys(child: Expression) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + override def dataType: DataType = ArrayType(child.dataType.asInstanceOf[MapType].keyType) + + override def foldable: Boolean = child.foldable + + override def nullSafeEval(map: Any): Any = { + map.asInstanceOf[MapData].keyArray().copy() + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).keyArray().copy();") + } + + override def prettyName: String = "map_keys" +} + +/** + * Returns an unordered array containing the values of the map. + */ +@ExpressionDescription( + usage = "_FUNC_(map) - Returns an unordered array containing the values of the map.") +case class MapValues(child: Expression) + extends UnaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType) + + override def dataType: DataType = ArrayType(child.dataType.asInstanceOf[MapType].valueType) + + override def foldable: Boolean = child.foldable + + override def nullSafeEval(map: Any): Any = { + map.asInstanceOf[MapData].valueArray().copy() + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).valueArray().copy();") + } + + override def prettyName: String = "map_values" +} + /** * Sorts the input array in ascending / descending order according to the natural ordering of * the array elements and returns it. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala index 1aae4678d627..d961398dd68f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala @@ -44,6 +44,16 @@ class CollectionFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Literal.create(null, ArrayType(StringType)), null) } + test("MapKeys/MapValues") { + val m0 = Literal.create(Map("a" -> "1", "b" -> "2"), MapType(StringType, StringType)) + val m1 = Literal.create(Map[String, String](), MapType(StringType, StringType)) + + checkEvaluation(MapKeys(m0), Seq("a", "b")) + checkEvaluation(MapValues(m0), Seq("1", "2")) + checkEvaluation(MapKeys(m1), Seq()) + checkEvaluation(MapValues(m1), Seq()) + } + test("Sort Array") { val a0 = Literal.create(Seq(2, 1, 3), ArrayType(IntegerType)) val a1 = Literal.create(Seq[Integer](), ArrayType(IntegerType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c8782df146df..f850aa488ed2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2760,6 +2760,22 @@ object functions { */ def size(e: Column): Column = withExpr { Size(e.expr) } + /** + * Returns the key array of the map. + * + * @group collection_funcs + * @since 2.1.0 + */ + def map_keys(e: Column): Column = withExpr { MapKeys(e.expr) } + + /** + * Returns the key array of the map. + * + * @group collection_funcs + * @since 2.1.0 + */ + def map_values(e: Column): Column = withExpr { MapValues(e.expr) } + /** * Sorts the input array for the given column in ascending order, * according to the natural ordering of the array elements. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 73d77651a027..2ef68bfe72c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -352,6 +352,30 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ) } + test("map_keys/map_values function") { + val df = Seq( + (Map[Int, Int](1 -> 100, 2 -> 200), "x"), + (Map[Int, Int](), "y"), + (Map[Int, Int](1 -> 100, 2 -> 200, 3 -> 300), "z") + ).toDF("a", "b") + checkAnswer( + df.select(map_keys($"a")), + Seq(Row(Seq(1, 2)), Row(Seq.empty), Row(Seq(1, 2, 3))) + ) + checkAnswer( + df.selectExpr("map_keys(a)"), + Seq(Row(Seq(1, 2)), Row(Seq.empty), Row(Seq(1, 2, 3))) + ) + checkAnswer( + df.select(map_values($"a")), + Seq(Row(Seq(100, 200)), Row(Seq.empty), Row(Seq(100, 200, 300))) + ) + checkAnswer( + df.selectExpr("map_values(a)"), + Seq(Row(Seq(100, 200)), Row(Seq.empty), Row(Seq(100, 200, 300))) + ) + } + test("array contains function") { val df = Seq( (Seq[Int](1, 2), "x"), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 1fffadbfcae6..53990b8e3b38 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -239,7 +239,6 @@ private[sql] class HiveSessionCatalog( // str_to_map, windowingtablefunction. private val hiveFunctions = Seq( "hash", "java_method", "histogram_numeric", - "map_keys", "map_values", "parse_url", "percentile", "percentile_approx", "reflect", "sentences", "stack", "str_to_map", "xpath", "xpath_double", "xpath_float", "xpath_int", "xpath_long", "xpath_number", "xpath_short", "xpath_string", From 8c1ea5b320c109cc9ff5a70837aab78a9f41ce11 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 30 Jun 2016 03:15:08 -0700 Subject: [PATCH 2/4] Remove redundant copy() and add function extended description. --- .../expressions/collectionOperations.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index eec8a7e3185d..4f7a30018cc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -47,7 +47,8 @@ case class Size(child: Expression) extends UnaryExpression with ExpectsInputType * Returns an unordered array containing the keys of the map. */ @ExpressionDescription( - usage = "_FUNC_(map) - Returns an unordered array containing the keys of the map.") + usage = "_FUNC_(map) - Returns an unordered array containing the keys of the map.", + extended = " > SELECT _FUNC_(map(1, 'a', 2, 'b'));\n [1,2]") case class MapKeys(child: Expression) extends UnaryExpression with ExpectsInputTypes { @@ -58,11 +59,11 @@ case class MapKeys(child: Expression) override def foldable: Boolean = child.foldable override def nullSafeEval(map: Any): Any = { - map.asInstanceOf[MapData].keyArray().copy() + map.asInstanceOf[MapData].keyArray() } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).keyArray().copy();") + nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).keyArray();") } override def prettyName: String = "map_keys" @@ -72,7 +73,8 @@ case class MapKeys(child: Expression) * Returns an unordered array containing the values of the map. */ @ExpressionDescription( - usage = "_FUNC_(map) - Returns an unordered array containing the values of the map.") + usage = "_FUNC_(map) - Returns an unordered array containing the values of the map.", + extended = " > SELECT _FUNC_(map(1, 'a', 2, 'b'));\n [\"a\",\"b\"]") case class MapValues(child: Expression) extends UnaryExpression with ExpectsInputTypes { @@ -83,11 +85,11 @@ case class MapValues(child: Expression) override def foldable: Boolean = child.foldable override def nullSafeEval(map: Any): Any = { - map.asInstanceOf[MapData].valueArray().copy() + map.asInstanceOf[MapData].valueArray() } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).valueArray().copy();") + nullSafeCodeGen(ctx, ev, c => s"${ev.value} = ($c).valueArray();") } override def prettyName: String = "map_values" From c0dac4bacbd07d244ea716db076b7a638be31837 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 30 Jun 2016 10:38:54 -0700 Subject: [PATCH 3/4] Add a null map testcase and remove redundant implementation. --- .../spark/sql/catalyst/expressions/collectionOperations.scala | 4 ---- .../sql/catalyst/expressions/CollectionFunctionsSuite.scala | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 4f7a30018cc3..2e8ea1107cee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -56,8 +56,6 @@ case class MapKeys(child: Expression) override def dataType: DataType = ArrayType(child.dataType.asInstanceOf[MapType].keyType) - override def foldable: Boolean = child.foldable - override def nullSafeEval(map: Any): Any = { map.asInstanceOf[MapData].keyArray() } @@ -82,8 +80,6 @@ case class MapValues(child: Expression) override def dataType: DataType = ArrayType(child.dataType.asInstanceOf[MapType].valueType) - override def foldable: Boolean = child.foldable - override def nullSafeEval(map: Any): Any = { map.asInstanceOf[MapData].valueArray() } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala index d961398dd68f..a5f784fdcc13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionFunctionsSuite.scala @@ -47,11 +47,14 @@ class CollectionFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("MapKeys/MapValues") { val m0 = Literal.create(Map("a" -> "1", "b" -> "2"), MapType(StringType, StringType)) val m1 = Literal.create(Map[String, String](), MapType(StringType, StringType)) + val m2 = Literal.create(null, MapType(StringType, StringType)) checkEvaluation(MapKeys(m0), Seq("a", "b")) checkEvaluation(MapValues(m0), Seq("1", "2")) checkEvaluation(MapKeys(m1), Seq()) checkEvaluation(MapValues(m1), Seq()) + checkEvaluation(MapKeys(m2), null) + checkEvaluation(MapValues(m2), null) } test("Sort Array") { From 8db1e656f27aa1647fca7c86405959262c3365fd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 30 Jun 2016 17:24:44 -0700 Subject: [PATCH 4/4] Remove from `sql.functions`. --- .../scala/org/apache/spark/sql/functions.scala | 16 ---------------- .../spark/sql/DataFrameFunctionsSuite.scala | 8 -------- 2 files changed, 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index f850aa488ed2..c8782df146df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2760,22 +2760,6 @@ object functions { */ def size(e: Column): Column = withExpr { Size(e.expr) } - /** - * Returns the key array of the map. - * - * @group collection_funcs - * @since 2.1.0 - */ - def map_keys(e: Column): Column = withExpr { MapKeys(e.expr) } - - /** - * Returns the key array of the map. - * - * @group collection_funcs - * @since 2.1.0 - */ - def map_values(e: Column): Column = withExpr { MapValues(e.expr) } - /** * Sorts the input array for the given column in ascending order, * according to the natural ordering of the array elements. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index 2ef68bfe72c3..0f6c49e75959 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -358,18 +358,10 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { (Map[Int, Int](), "y"), (Map[Int, Int](1 -> 100, 2 -> 200, 3 -> 300), "z") ).toDF("a", "b") - checkAnswer( - df.select(map_keys($"a")), - Seq(Row(Seq(1, 2)), Row(Seq.empty), Row(Seq(1, 2, 3))) - ) checkAnswer( df.selectExpr("map_keys(a)"), Seq(Row(Seq(1, 2)), Row(Seq.empty), Row(Seq(1, 2, 3))) ) - checkAnswer( - df.select(map_values($"a")), - Seq(Row(Seq(100, 200)), Row(Seq.empty), Row(Seq(100, 200, 300))) - ) checkAnswer( df.selectExpr("map_values(a)"), Seq(Row(Seq(100, 200)), Row(Seq.empty), Row(Seq(100, 200, 300)))