Skip to content

Commit 45685e4

Browse files
committed
Merge remote-tracking branch 'origin/master' into os/inmemoryrelation-str
2 parents cf2eae2 + f59de52 commit 45685e4

File tree

72 files changed

+1616
-816
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+1616
-816
lines changed

bin/docker-image-tool.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ BASEDOCKERFILE=
135135
PYDOCKERFILE=
136136
NOCACHEARG=
137137
BUILD_PARAMS=
138-
while getopts f:mr:t:n:b: option
138+
while getopts f:p:mr:t:n:b: option
139139
do
140140
case "${option}"
141141
in

docs/ml-features.md

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,11 @@ for more details on the API.
585585
## StringIndexer
586586

587587
`StringIndexer` encodes a string column of labels to a column of label indices.
588-
The indices are in `[0, numLabels)`, ordered by label frequencies, so the most frequent label gets index `0`.
588+
The indices are in `[0, numLabels)`, and four ordering options are supported:
589+
"frequencyDesc": descending order by label frequency (most frequent label assigned 0),
590+
"frequencyAsc": ascending order by label frequency (least frequent label assigned 0),
591+
"alphabetDesc": descending alphabetical order, and "alphabetAsc": ascending alphabetical order
592+
(default = "frequencyDesc").
589593
The unseen labels will be put at index numLabels if user chooses to keep them.
590594
If the input column is numeric, we cast it to string and index the string
591595
values. When downstream pipeline components such as `Estimator` or
@@ -1593,10 +1597,25 @@ Suppose `a` and `b` are double columns, we use the following simple examples to
15931597
* `y ~ a + b + a:b - 1` means model `y ~ w1 * a + w2 * b + w3 * a * b` where `w1, w2, w3` are coefficients.
15941598

15951599
`RFormula` produces a vector column of features and a double or string column of label.
1596-
Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles.
1597-
If the label column is of type string, it will be first transformed to double with `StringIndexer`.
1600+
Like when formulas are used in R for linear regression, numeric columns will be cast to doubles.
1601+
As to string input columns, they will first be transformed with [StringIndexer](ml-features.html#stringindexer) using ordering determined by `stringOrderType`,
1602+
and the last category after ordering is dropped, then the doubles will be one-hot encoded.
1603+
1604+
Suppose a string feature column containing values `{'b', 'a', 'b', 'a', 'c', 'b'}`, we set `stringOrderType` to control the encoding:
1605+
~~~
1606+
stringOrderType | Category mapped to 0 by StringIndexer | Category dropped by RFormula
1607+
----------------|---------------------------------------|---------------------------------
1608+
'frequencyDesc' | most frequent category ('b') | least frequent category ('c')
1609+
'frequencyAsc' | least frequent category ('c') | most frequent category ('b')
1610+
'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')
1611+
'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c')
1612+
~~~
1613+
1614+
If the label column is of type string, it will be first transformed to double with [StringIndexer](ml-features.html#stringindexer) using `frequencyDesc` ordering.
15981615
If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.
15991616

1617+
**Note:** The ordering option `stringOrderType` is NOT used for the label column. When the label column is indexed, it uses the default descending frequency ordering in `StringIndexer`.
1618+
16001619
**Examples**
16011620

16021621
Assume that we have a DataFrame with the columns `id`, `country`, `hour`, and `clicked`:

docs/sql-programming-guide.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,6 +1407,13 @@ the following case-insensitive options:
14071407
This is a JDBC writer related option. When <code>SaveMode.Overwrite</code> is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to <code>false</code>. This option applies only to writing.
14081408
</td>
14091409
</tr>
1410+
1411+
<tr>
1412+
<td><code>cascadeTruncate</code></td>
1413+
<td>
1414+
This is a JDBC writer related option. If enabled and supported by the JDBC database (PostgreSQL and Oracle at the moment), this options allows execution of a <code>TRUNCATE TABLE t CASCADE</code> (in the case of PostgreSQL a <code>TRUNCATE TABLE ONLY t CASCADE</code> is executed to prevent inadvertently truncating descendant tables). This will affect other tables, and thus should be used with care. This option applies only to writing. It defaults to the default cascading truncate behaviour of the JDBC database in question, specified in the <code>isCascadeTruncate</code> in each JDBCDialect.
1415+
</td>
1416+
</tr>
14101417

14111418
<tr>
14121419
<td><code>createTableOptions</code></td>

external/avro/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
<type>test-jar</type>
6262
<scope>test</scope>
6363
</dependency>
64+
<dependency>
65+
<groupId>org.scalacheck</groupId>
66+
<artifactId>scalacheck_${scala.binary.version}</artifactId>
67+
<scope>test</scope>
68+
</dependency>
6469
<dependency>
6570
<groupId>org.apache.spark</groupId>
6671
<artifactId>spark-tags_${scala.binary.version}</artifactId>
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.avro.Schema
21+
import org.apache.avro.generic.GenericDatumReader
22+
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
23+
24+
import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
25+
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression}
26+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
27+
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
28+
29+
case class AvroDataToCatalyst(child: Expression, jsonFormatSchema: String)
30+
extends UnaryExpression with ExpectsInputTypes {
31+
32+
override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
33+
34+
override lazy val dataType: DataType = SchemaConverters.toSqlType(avroSchema).dataType
35+
36+
override def nullable: Boolean = true
37+
38+
@transient private lazy val avroSchema = new Schema.Parser().parse(jsonFormatSchema)
39+
40+
@transient private lazy val reader = new GenericDatumReader[Any](avroSchema)
41+
42+
@transient private lazy val deserializer = new AvroDeserializer(avroSchema, dataType)
43+
44+
@transient private var decoder: BinaryDecoder = _
45+
46+
@transient private var result: Any = _
47+
48+
override def nullSafeEval(input: Any): Any = {
49+
val binary = input.asInstanceOf[Array[Byte]]
50+
decoder = DecoderFactory.get().binaryDecoder(binary, 0, binary.length, decoder)
51+
result = reader.read(result, decoder)
52+
deserializer.deserialize(result)
53+
}
54+
55+
override def simpleString: String = {
56+
s"from_avro(${child.sql}, ${dataType.simpleString})"
57+
}
58+
59+
override def sql: String = {
60+
s"from_avro(${child.sql}, ${dataType.catalogString})"
61+
}
62+
63+
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
64+
val expr = ctx.addReferenceObj("this", this)
65+
defineCodeGen(ctx, ev, input =>
66+
s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
67+
}
68+
}

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,19 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
5858
options: Map[String, String],
5959
files: Seq[FileStatus]): Option[StructType] = {
6060
val conf = spark.sparkContext.hadoopConfiguration
61-
val parsedOptions = new AvroOptions(options)
61+
val parsedOptions = new AvroOptions(options, conf)
6262

6363
// Schema evolution is not supported yet. Here we only pick a single random sample file to
6464
// figure out the schema of the whole dataset.
6565
val sampleFile =
66-
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
67-
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
68-
throw new FileNotFoundException(
69-
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
70-
" is set to true. Do all input files have \".avro\" extension?"
71-
)
66+
if (parsedOptions.ignoreExtension) {
67+
files.headOption.getOrElse {
68+
throw new FileNotFoundException("Files for schema inferring have been not found.")
7269
}
7370
} else {
74-
files.headOption.getOrElse {
75-
throw new FileNotFoundException("No Avro files found.")
71+
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
72+
throw new FileNotFoundException(
73+
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
7674
}
7775
}
7876

@@ -115,7 +113,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
115113
job: Job,
116114
options: Map[String, String],
117115
dataSchema: StructType): OutputWriterFactory = {
118-
val parsedOptions = new AvroOptions(options)
116+
val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())
119117
val outputAvroSchema = SchemaConverters.toAvroType(
120118
dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
121119

@@ -146,7 +144,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
146144
log.error(s"unsupported compression codec $unknown")
147145
}
148146

149-
new AvroOutputWriterFactory(dataSchema, new SerializableSchema(outputAvroSchema))
147+
new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
150148
}
151149

152150
override def buildReader(
@@ -160,7 +158,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
160158

161159
val broadcastedConf =
162160
spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
163-
val parsedOptions = new AvroOptions(options)
161+
val parsedOptions = new AvroOptions(options, hadoopConf)
164162

165163
(file: PartitionedFile) => {
166164
val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
@@ -171,9 +169,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
171169
// Doing input file filtering is improper because we may generate empty tasks that process no
172170
// input files but stress the scheduler. We should probably add a more general input file
173171
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
174-
if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
175-
Iterator.empty
176-
} else {
172+
if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
177173
val reader = {
178174
val in = new FsInput(new Path(new URI(file.filePath)), conf)
179175
try {
@@ -228,6 +224,8 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
228224
deserializer.deserialize(record).asInstanceOf[InternalRow]
229225
}
230226
}
227+
} else {
228+
Iterator.empty
231229
}
232230
}
233231
}
@@ -274,11 +272,4 @@ private[avro] object AvroFileFormat {
274272
value.readFields(new DataInputStream(in))
275273
}
276274
}
277-
278-
def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
279-
// Files without .avro extensions are not ignored by default
280-
val defaultValue = false
281-
282-
conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
283-
}
284275
}

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@
1717

1818
package org.apache.spark.sql.avro
1919

20+
import org.apache.hadoop.conf.Configuration
21+
2022
import org.apache.spark.internal.Logging
2123
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
2224

2325
/**
2426
* Options for Avro Reader and Writer stored in case insensitive manner.
2527
*/
26-
class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
27-
extends Logging with Serializable {
28+
class AvroOptions(
29+
@transient val parameters: CaseInsensitiveMap[String],
30+
@transient val conf: Configuration) extends Logging with Serializable {
2831

29-
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
32+
def this(parameters: Map[String, String], conf: Configuration) = {
33+
this(CaseInsensitiveMap(parameters), conf)
34+
}
3035

3136
/**
3237
* Optional schema provided by an user in JSON format.
@@ -45,4 +50,22 @@ class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
4550
* See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
4651
*/
4752
val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
53+
54+
/**
55+
* The `ignoreExtension` option controls ignoring of files without `.avro` extensions in read.
56+
* If the option is enabled, all files (with and without `.avro` extension) are loaded.
57+
* If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension`
58+
* is taken into account. If the former one is not set too, file extensions are ignored.
59+
*/
60+
val ignoreExtension: Boolean = {
61+
val ignoreFilesWithoutExtensionByDefault = false
62+
val ignoreFilesWithoutExtension = conf.getBoolean(
63+
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
64+
ignoreFilesWithoutExtensionByDefault)
65+
66+
parameters
67+
.get("ignoreExtension")
68+
.map(_.toBoolean)
69+
.getOrElse(!ignoreFilesWithoutExtension)
70+
}
4871
}

external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,29 @@
1717

1818
package org.apache.spark.sql.avro
1919

20+
import org.apache.avro.Schema
2021
import org.apache.hadoop.mapreduce.TaskAttemptContext
2122

2223
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
2324
import org.apache.spark.sql.types.StructType
2425

26+
/**
27+
* A factory that produces [[AvroOutputWriter]].
28+
* @param catalystSchema Catalyst schema of input data.
29+
* @param avroSchemaAsJsonString Avro schema of output result, in JSON string format.
30+
*/
2531
private[avro] class AvroOutputWriterFactory(
26-
schema: StructType,
27-
avroSchema: SerializableSchema) extends OutputWriterFactory {
32+
catalystSchema: StructType,
33+
avroSchemaAsJsonString: String) extends OutputWriterFactory {
34+
35+
private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
2836

2937
override def getFileExtension(context: TaskAttemptContext): String = ".avro"
3038

3139
override def newInstance(
3240
path: String,
3341
dataSchema: StructType,
3442
context: TaskAttemptContext): OutputWriter = {
35-
new AvroOutputWriter(path, context, schema, avroSchema.value)
43+
new AvroOutputWriter(path, context, catalystSchema, avroSchema)
3644
}
3745
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import java.io.ByteArrayOutputStream
21+
22+
import org.apache.avro.generic.GenericDatumWriter
23+
import org.apache.avro.io.{BinaryEncoder, EncoderFactory}
24+
25+
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
26+
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
27+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
28+
import org.apache.spark.sql.types.{BinaryType, DataType}
29+
30+
case class CatalystDataToAvro(child: Expression) extends UnaryExpression {
31+
32+
override def dataType: DataType = BinaryType
33+
34+
@transient private lazy val avroType =
35+
SchemaConverters.toAvroType(child.dataType, child.nullable)
36+
37+
@transient private lazy val serializer =
38+
new AvroSerializer(child.dataType, avroType, child.nullable)
39+
40+
@transient private lazy val writer =
41+
new GenericDatumWriter[Any](avroType)
42+
43+
@transient private var encoder: BinaryEncoder = _
44+
45+
@transient private lazy val out = new ByteArrayOutputStream
46+
47+
override def nullSafeEval(input: Any): Any = {
48+
out.reset()
49+
encoder = EncoderFactory.get().directBinaryEncoder(out, encoder)
50+
val avroData = serializer.serialize(input)
51+
writer.write(avroData, encoder)
52+
encoder.flush()
53+
out.toByteArray
54+
}
55+
56+
override def simpleString: String = {
57+
s"to_avro(${child.sql}, ${child.dataType.simpleString})"
58+
}
59+
60+
override def sql: String = {
61+
s"to_avro(${child.sql}, ${child.dataType.catalogString})"
62+
}
63+
64+
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
65+
val expr = ctx.addReferenceObj("this", this)
66+
defineCodeGen(ctx, ev, input =>
67+
s"(byte[]) $expr.nullSafeEval($input)")
68+
}
69+
}

0 commit comments

Comments
 (0)