Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d94c65e
[SPARK-40717][CONNECT] Support Column Alias in the Connect DSL
amaliujia Oct 11, 2022
6c182da
[SPARK-40744][PS] Make `_reduce_for_stat_function` in `groupby` accep…
zhengruifeng Oct 11, 2022
8e31554
[SPARK-40742][CORE][SQL] Fix Java compilation warnings related to gen…
LuciferYang Oct 11, 2022
efd9ef9
[SPARK-40735] Consistently invoke bash with /usr/bin/env bash in scri…
huangxiaopingRD Oct 11, 2022
996e407
[SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGA…
itholic Oct 11, 2022
1103d29
[MINOR] Fix grammar in error message
verhovsky Oct 11, 2022
6bbf4f5
[SPARK-40745][MLLIB] Reduce the shuffle size of ALS in `.mllib`
zhengruifeng Oct 11, 2022
6603b82
[SPARK-40585][SQL] Double-quoted identifiers should be available only…
gengliangwang Oct 11, 2022
8d8fac2
[MINOR][BUILD] Handle empty PR body in merge script
srowen Oct 11, 2022
1c6bd9e
[SPARK-38959][SQL] DS V2: Support runtime group filtering in row-leve…
aokolnychyi Oct 11, 2022
ce809c7
[SPARK-40654][SQL] Protobuf support for Spark - from_protobuf AND to_…
SandishKumarHN Oct 12, 2022
cb0d6ed
[SPARK-8731] Beeline doesn't work with -e option when started in back…
zhouyifan279 Oct 12, 2022
790a697
[SPARK-40416][SQL][FOLLOW-UP] Check error classes in subquery tests
allisonwang-db Oct 12, 2022
c1f54e7
[SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGA…
itholic Oct 12, 2022
b9998cf
[SPARK-40697][SQL] Add read-side char padding to cover external data …
cloud-fan Oct 12, 2022
290e8a7
[SPARK-40729][CORE] Make Spark use `UnsafeFieldAccessor` as default f…
LuciferYang Oct 12, 2022
831bcb0
[SPARK-40361][SQL] Migrate arithmetic type check failures onto error …
lvshaokang Oct 12, 2022
0dd7834
[SPARK-40762][SQL][TESTS] Check error classes in `ErrorParserSuite`
MaxGekk Oct 12, 2022
e3c70cd
[SPARK-40663][SQL] Migrate execution errors onto error classes: _LEGA…
itholic Oct 12, 2022
8251f87
[SPARK-40772][SQL] Improve `spark.sql.adaptive.skewJoin.skewedPartiti…
dongjoon-hyun Oct 12, 2022
cb53e34
[MINOR][BUILD] Add `dist`dir to `fileset` of`maven-clean-plugin`
LuciferYang Oct 12, 2022
18f1f53
[SPARK-38774][PS][FOLLOW-UP] Make parameter name in `Series.autocorr`…
zhengruifeng Oct 13, 2022
45d537d
[SPARK-40724][PS][FOLLOW-UP] Simplify `corrwith` with method `inline`
zhengruifeng Oct 13, 2022
b337a6d
[SPARK-40766][BUILD] Upgrade the guava defined in `plugins.sbt` used …
LuciferYang Oct 13, 2022
7ba70f0
[SPARK-40765][SQL] Optimize redundant fs operation in `CommandUtils#c…
LuciferYang Oct 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ CONNECT:
- "connector/connect/**/*"
- "**/sql/sparkconnect/**/*"
- "python/pyspark/sql/**/connect/**/*"
PROTOBUF:
- "connector/protobuf/**/*"
- "python/pyspark/sql/protobuf/**/*"
2 changes: 1 addition & 1 deletion R/check-cran.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/create-docs.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/create-rd.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/find-r.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/install-dev.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/install-source-package.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
5 changes: 5 additions & 0 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ if [ -z "$SPARK_SCALA_VERSION" ]; then
export SPARK_SCALA_VERSION=${SCALA_VERSION_2}
fi
fi

# Append jline option to enable the Beeline process to run in background.
if [[ ( ! $(ps -o stat= -p $$) =~ "+" ) && ! ( -p /dev/stdin ) ]]; then
export SPARK_BEELINE_OPTS="$SPARK_BEELINE_OPTS -Djline.terminal=jline.UnsupportedTerminal"
fi
2 changes: 1 addition & 1 deletion bin/sparkR
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion binder/postBuild
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
Expand Down Expand Up @@ -53,7 +54,7 @@ protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
CodecFactory compressionCodec,
OutputStream outputStream,
int syncInterval) throws IOException {
return new SparkAvroKeyRecordWriter(
return new SparkAvroKeyRecordWriter<>(
writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata);
}
}
Expand All @@ -72,7 +73,7 @@ class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
OutputStream outputStream,
int syncInterval,
Map<String, String> metadata) throws IOException {
this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
this.mAvroFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(writerSchema, dataModel));
for (Map.Entry<String, String> entry : metadata.entrySet()) {
this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ abstract class AvroSuite
.save(s"$tempDir/${UUID.randomUUID()}")
}.getMessage
assert(message.contains("Caused by: java.lang.NullPointerException: "))
assert(message.contains("null in string in field Name"))
assert(message.contains("null value for (non-nullable) string at test_schema.Name"))
}
}

Expand Down
2 changes: 2 additions & 0 deletions connector/connect/dev/generate_protos.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message Expression {
UnresolvedFunction unresolved_function = 3;
ExpressionString expression_string = 4;
UnresolvedStar unresolved_star = 5;
Alias alias = 6;
}

message Literal {
Expand Down Expand Up @@ -166,4 +167,9 @@ message Expression {
string name = 1;
DataType type = 2;
}

message Alias {
Expression expr = 1;
string name = 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ package object dsl {
.build())
.build()
}

implicit class DslExpression(val expr: proto.Expression) {
def as(alias: String): proto.Expression = proto.Expression.newBuilder().setAlias(
proto.Expression.Alias.newBuilder().setName(alias).setExpr(expr)).build()
}
}

object plans { // scalastyle:ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -132,6 +132,7 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
transformUnresolvedExpression(exp)
case proto.Expression.ExprTypeCase.UNRESOLVED_FUNCTION =>
transformScalarFunction(exp.getUnresolvedFunction)
case proto.Expression.ExprTypeCase.ALIAS => transformAlias(exp.getAlias)
case _ => throw InvalidPlanInput()
}
}
Expand Down Expand Up @@ -208,6 +209,10 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) {
}
}

private def transformAlias(alias: proto.Expression.Alias): Expression = {
Alias(transformExpression(alias.getExpr), alias.getName)()
}

private def transformUnion(u: proto.Union): LogicalPlan = {
assert(u.getInputsCount == 2, "Union must have 2 inputs")
val plan = logical.Union(transformRelation(u.getInputs(0)), transformRelation(u.getInputs(1)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
}
}

test("column alias") {
val connectPlan = {
import org.apache.spark.sql.connect.dsl.expressions._
import org.apache.spark.sql.connect.dsl.plans._
transform(connectTestRelation.select("id".protoAttr.as("id2")))
}
val sparkPlan = sparkTestRelation.select($"id".as("id2"))
}

test("Aggregate with more than 1 grouping expressions") {
val connectPlan = {
import org.apache.spark.sql.connect.dsl.expressions._
Expand Down
2 changes: 1 addition & 1 deletion connector/docker/build
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion connector/docker/spark-test/build
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion connector/docker/spark-test/master/default_cmd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion connector/docker/spark-test/worker/default_cmd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
115 changes: 115 additions & 0 deletions connector/protobuf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.12</artifactId>
<version>3.4.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>spark-protobuf_2.12</artifactId>
<properties>
<sbt.project.name>protobuf</sbt.project.name>
<protobuf.version>3.21.1</protobuf.version>
</properties>
<packaging>jar</packaging>
<name>Spark Protobuf</name>
<url>https://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<!-- #if scala-2.13 --><!--
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
--><!-- #endif scala-2.13 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.protobuf:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>${spark.shade.packageName}.spark-protobuf.protobuf</shadedPattern>
<includes>
<include>com.google.protobuf.**</include>
</includes>
</relocation>
</relocations>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.protobuf

import com.google.protobuf.DynamicMessage

import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.types.{BinaryType, DataType}

private[protobuf] case class CatalystDataToProtobuf(
child: Expression,
descFilePath: String,
messageName: String)
extends UnaryExpression {

override def dataType: DataType = BinaryType

@transient private lazy val protoType =
ProtobufUtils.buildDescriptor(descFilePath, messageName)

@transient private lazy val serializer =
new ProtobufSerializer(child.dataType, protoType, child.nullable)

override def nullSafeEval(input: Any): Any = {
val dynamicMessage = serializer.serialize(input).asInstanceOf[DynamicMessage]
dynamicMessage.toByteArray
}

override def prettyName: String = "to_protobuf"

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val expr = ctx.addReferenceObj("this", this)
defineCodeGen(ctx, ev, input => s"(byte[]) $expr.nullSafeEval($input)")
}

override protected def withNewChildInternal(newChild: Expression): CatalystDataToProtobuf =
copy(child = newChild)
}
Loading