Skip to content

Conversation

@fjh100456
Copy link
Owner

What changes were proposed in this pull request?

Update the code

How was this patch tested?

No

Please review http://spark.apache.org/contributing.html before opening a pull request.

chetkhatri and others added 30 commits December 23, 2017 08:13
## What changes were proposed in this pull request?

SparkHive Scala Examples Improvement made:
* Writing DataFrame / DataSet to Hive Managed , Hive External table using different storage format.
* Implementation of Partition, Reparition, Coalesce with appropriate example.

## How was this patch tested?
* Patch has been tested manually and by running ./dev/run-tests.

Author: chetkhatri <[email protected]>

Closes #20018 from chetkhatri/scala-sparkhive-examples.
## What changes were proposed in this pull request?

Create table using the right DataFrame. peopleDF->usersDF

peopleDF:
+----+-------+
| age|   name|
+----+-------+
usersDF:
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+

## How was this patch tested?

Manually tested.

Author: CNRui <[email protected]>

Closes #20052 from CNRui/patch-2.
## What changes were proposed in this pull request?

This PR fixes a style that broke the build.

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <[email protected]>

Closes #20065 from HyukjinKwon/minor-style.
## What changes were proposed in this pull request?

This PR adds `date_trunc` in R API as below:

```r
> df <- createDataFrame(list(list(a = as.POSIXlt("2012-12-13 12:34:00"))))
> head(select(df, date_trunc("hour", df$a)))
  date_trunc(hour, a)
1 2012-12-13 12:00:00
```

## How was this patch tested?

Unit tests added in `R/pkg/tests/fulltests/test_sparkSQL.R`.

Author: hyukjinkwon <[email protected]>

Closes #20031 from HyukjinKwon/r-datetrunc.
## What changes were proposed in this pull request?

Since all CRAN checks go through the same machine, if there is an older partial download or partial install of Spark left behind the tests fail. This PR overwrites the install files when running tests. This shouldn't affect Jenkins as `SPARK_HOME` is set when running Jenkins tests.

## How was this patch tested?

Test manually by running `R CMD check --as-cran`

Author: Shivaram Venkataraman <[email protected]>

Closes #20060 from shivaram/sparkr-overwrite-cran.
## What changes were proposed in this pull request?
In choosing a Partitioner to use for a cogroup-like operation between a number of RDDs, the default behaviour was if some of the RDDs already have a partitioner, we choose the one amongst them with the maximum number of partitions.

This behaviour, in some cases, could hit the 2G limit (SPARK-6235). To illustrate one such scenario, consider two RDDs:
rDD1: with smaller data and smaller number of partitions, alongwith a Partitioner.
rDD2: with much larger data and a larger number of partitions, without a Partitioner.

The cogroup of these two RDDs could hit the 2G limit, as a larger amount of data is shuffled into a smaller number of partitions.

This PR introduces a safety-check wherein the Partitioner is chosen only if either of the following conditions are met:
1. if the number of partitions of the RDD associated with the Partitioner is greater than or equal to the max number of upstream partitions; or
2. if the number of partitions of the RDD associated with the Partitioner is less than and within a single order of magnitude of the max number of upstream partitions.

## How was this patch tested?
Unit tests in PartitioningSuite and PairRDDFunctionsSuite

Author: sujithjay <[email protected]>

Closes #20002 from sujithjay/SPARK-22465.
… in fitting

## What changes were proposed in this pull request?

Via some test I found CrossValidator still exists memory issue, it will still occupy `O(n*sizeof(model))` memory for holding models when fitting, if well optimized, it should be `O(parallelism*sizeof(model))`

This is because modelFutures will hold the reference to model object after future is complete (we can use `future.value.get.get` to fetch it), and the `Future.sequence` and the `modelFutures` array holds references to each model future. So all model object are keep referenced. So it will still occupy `O(n*sizeof(model))` memory.

I fix this by merging the `modelFuture` and `foldMetricFuture` together, and use `atomicInteger` to statistic complete fitting tasks and when all done, trigger `trainingDataset.unpersist`.

I ever commented this issue on the old PR [SPARK-19357]
#16774 (review)
unfortunately, at that time I do not realize that the issue still exists, but now I confirm it and create this PR to fix it.

## Discussion
I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off.

**After discussion with jkbradley , choose approach 3**

### Approach 1
~~The approach proposed by MrBago at~~ #19904 (comment)
~~This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. **BUT, in some cases, it still do not resolve the O(N) model memory occupation issue**. Let me use an extreme case to describe it:~~
~~suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`, the code have to wait 100 fitting tasks complete, **(at this time the memory occupation by models already reach 100 * sizeof(model) )** and then it will unpersist training dataset and then do 100 evaluation tasks.~~

### Approach 2
~~This approach is my PR old version code~~ 2cc7c28
~~This approach can make sure at any case, the peak memory occupation by models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case
 `parallelism = 1`, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.~~

### Approach 3
After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:
- Make the peak memory occupation by models(driver-side) to be O(parallelism * sizeof(model))
- unpersist training dataset before most of the evaluation tasks started.

So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM.
Like following code:
```
      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Double] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
          //...other minor codes
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric metricformodeltrainedwithparamMap.")
          metric
        } (executionContext)
      }
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
      trainingDataset.unpersist() // <------- unpersist at the end
      validationDataset.unpersist()
```

## How was this patch tested?

N/A

Author: WeichenXu <[email protected]>

Closes #19904 from WeichenXu123/fix_cross_validator_memory_issue.
## What changes were proposed in this pull request?

We should use `dataType.simpleString` to unified the data type mismatch message:
Before:
```
spark-sql> select cast(1 as binary);
Error in query: cannot resolve 'CAST(1 AS BINARY)' due to data type mismatch: cannot cast IntegerType to BinaryType; line 1 pos 7;
```
After:
```
park-sql> select cast(1 as binary);
Error in query: cannot resolve 'CAST(1 AS BINARY)' due to data type mismatch: cannot cast int to binary; line 1 pos 7;
```

## How was this patch tested?

Exist test.

Author: Yuming Wang <[email protected]>

Closes #20064 from wangyum/SPARK-22893.
…actual versions.

## What changes were proposed in this pull request?

This is a follow-up pr of #20054 modifying error messages for both pandas and pyarrow to show actual versions.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <[email protected]>

Closes #20074 from ueshin/issues/SPARK-22874_fup1.
…ic timestamp

## What changes were proposed in this pull request?

Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp.

The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp.

## How was this patch tested?

Unit Tests

cc : budde brkyvz

Author: Yash Sharma <[email protected]>

Closes #18029 from yssharma/ysharma/kcl_resume.
## What changes were proposed in this pull request?

Decimal type is not yet supported in `ArrowWriter`.
This is adding the decimal type support.

## How was this patch tested?

Added a test to `ArrowConvertersSuite`.

Author: Takuya UESHIN <[email protected]>

Closes #18754 from ueshin/issues/SPARK-21552.
## What changes were proposed in this pull request?

In SPARK-20586 the flag `deterministic` was added to Scala UDF, but it is not available for python UDF. This flag is useful for cases when the UDF's code can return different result with the same input. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query. This can lead to unexpected behavior.

This PR adds the deterministic flag, via the `asNondeterministic` method, to let the user mark the function as non-deterministic and therefore avoid the optimizations which might lead to strange behaviors.

## How was this patch tested?

Manual tests:
```
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import *
>>> df_br = spark.createDataFrame([{'name': 'hello'}])
>>> import random
>>> udf_random_col =  udf(lambda: int(100*random.random()), IntegerType()).asNondeterministic()
>>> df_br = df_br.withColumn('RAND', udf_random_col())
>>> random.seed(1234)
>>> udf_add_ten =  udf(lambda rand: rand + 10, IntegerType())
>>> df_br.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).show()
+-----+----+-------------+
| name|RAND|RAND_PLUS_TEN|
+-----+----+-------------+
|hello|   3|           13|
+-----+----+-------------+

```

Author: Marco Gaido <[email protected]>
Author: Marco Gaido <[email protected]>

Closes #19929 from mgaido91/SPARK-22629.
## What changes were proposed in this pull request?
Some improvements:
1. Point out we are using both Spark SQ native syntax and HQL syntax in the example
2. Avoid using the same table name with temp view, to not confuse users.
3. Create the external hive table with a directory that already has data, which is a more common use case.
4. Remove the usage of `spark.sql.parquet.writeLegacyFormat`. This config was introduced by #8566 and has nothing to do with Hive.
5. Remove `repartition` and `coalesce` example. These 2 are not Hive specific, we should put them in a different example file. BTW they can't accurately control the number of output files, `spark.sql.files.maxRecordsPerFile` also controls it.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #20081 from cloud-fan/minor.
## What changes were proposed in this pull request?

`DateTimeOperations` accept [`StringType`](https://github.com/apache/spark/blob/ae998ec2b5548b7028d741da4813473dde1ad81e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala#L669),  but:

```
spark-sql> SELECT '2017-12-24' + interval 2 months 2 seconds;
Error in query: cannot resolve '(CAST('2017-12-24' AS DOUBLE) + interval 2 months 2 seconds)' due to data type mismatch: differing types in '(CAST('2017-12-24' AS DOUBLE) + interval 2 months 2 seconds)' (double and calendarinterval).; line 1 pos 7;
'Project [unresolvedalias((cast(2017-12-24 as double) + interval 2 months 2 seconds), None)]
+- OneRowRelation
spark-sql>
```

After this PR:
```
spark-sql> SELECT '2017-12-24' + interval 2 months 2 seconds;
2018-02-24 00:00:02
Time taken: 0.2 seconds, Fetched 1 row(s)

```

## How was this patch tested?

unit tests

Author: Yuming Wang <[email protected]>

Closes #20067 from wangyum/SPARK-22894.
…h spark sql or thriftserver

## What changes were proposed in this pull request?
fix table owner is null when create new table through spark sql

## How was this patch tested?
manual test.
1、first create a table
2、then select the table properties from mysql which connected to hive metastore

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: xu.wenchun <[email protected]>

Closes #20034 from BruceXu1991/SPARK-22846.
## What changes were proposed in this pull request?

This is a follow-up pr of #19884 updating setup.py file to add pyarrow dependency.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <[email protected]>

Closes #20089 from ueshin/issues/SPARK-22324/fup1.
## What changes were proposed in this pull request?

Test coverage for arithmetic operations leading to:

 1. Precision loss
 2. Overflow

Moreover, tests for casting bad string to other input types and for using bad string as operators of some functions.

## How was this patch tested?

added tests

Author: Marco Gaido <[email protected]>

Closes #20084 from mgaido91/SPARK-22904.
…g data failed.

## What changes were proposed in this pull request?

Fix OneVsRestModel transform on streaming data failed.

## How was this patch tested?

UT will be added soon, once #19979 merged. (Need a helper test method there)

Author: WeichenXu <[email protected]>

Closes #20077 from WeichenXu123/fix_ovs_model_transform.
…reduce entries for mutable state

## What changes were proposed in this pull request?

This PR addresses additional review comments in #19811

## How was this patch tested?

Existing test suites

Author: Kazuaki Ishizaki <[email protected]>

Closes #20036 from kiszk/SPARK-18066-followup.
## What changes were proposed in this pull request?

This PR moves Structured Streaming v2 APIs to streaming folder as following:
```
sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming
├── ContinuousReadSupport.java
├── ContinuousWriteSupport.java
├── MicroBatchReadSupport.java
├── MicroBatchWriteSupport.java
├── reader
│   ├── ContinuousDataReader.java
│   ├── ContinuousReader.java
│   ├── MicroBatchReader.java
│   ├── Offset.java
│   └── PartitionOffset.java
└── writer
    └── ContinuousWriter.java
```

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <[email protected]>

Closes #20093 from zsxwing/move.
…, gcs, etc.) in Kubernetes mode

## What changes were proposed in this pull request?

This PR expands the Kubernetes mode to be able to use remote dependencies on http/https endpoints, GCS, S3, etc. It adds steps for configuring and appending the Kubernetes init-container into the driver and executor pods for downloading remote dependencies.
[Init-containers](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/), as the name suggests, are containers that are run to completion before the main containers start, and are often used to perform initialization tasks prior to starting the main containers. We use init-containers to localize remote application dependencies before the driver/executors start running. The code that the init-container runs is also included. This PR also adds a step to the driver and executors for mounting user-specified secrets that may store credentials for accessing data storage, e.g., S3 and Google Cloud Storage (GCS), into the driver and executors.

## How was this patch tested?

* The patch contains unit tests which are passing.
* Manual testing: `./build/mvn -Pkubernetes clean package` succeeded.
* Manual testing of the following cases:
  * [x] Running SparkPi using container-local spark-example jar.
  * [x] Running SparkPi using container-local spark-example jar with user-specific secret mounted.
  * [x] Running SparkPi using spark-example jar hosted remotely on an https endpoint.

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah foxish ash211 ssuchter varunkatta kimoonkim erikerlandson tnachen ifilonenko liyinan926
reviewers: vanzin felixcheung jiangxb1987 mridulm

Author: Yinan Li <[email protected]>

Closes #19954 from liyinan926/init-container.
…rets

## What changes were proposed in this pull request?

This PR updates the Kubernetes documentation corresponding to the following features/changes in #19954.
* Ability to use remote dependencies through the init-container.
* Ability to mount user-specified secrets into the driver and executor pods.

vanzin jiangxb1987 foxish

Author: Yinan Li <[email protected]>

Closes #20059 from liyinan926/doc-update.
## What changes were proposed in this pull request?

This PR proposes to add `localCheckpoint(..)` in R API.

```r
df <- localCheckpoint(createDataFrame(iris))
```

## How was this patch tested?

Unit tests added in `R/pkg/tests/fulltests/test_sparkSQL.R`

Author: hyukjinkwon <[email protected]>

Closes #20073 from HyukjinKwon/SPARK-22843.
## What changes were proposed in this pull request?

This PR adds `setLocalProperty` and `getLocalProperty`in R.

```R
> df <- createDataFrame(iris)
> setLocalProperty("spark.job.description", "Hello world!")
> count(df)
> setLocalProperty("spark.job.description", "Hi !!")
> count(df)
```

<img width="775" alt="2017-12-25 4 18 07" src="https://user-images.githubusercontent.com/6477701/34335213-60655a7c-e990-11e7-88aa-12debe311627.png">

```R
> print(getLocalProperty("spark.job.description"))
NULL
> setLocalProperty("spark.job.description", "Hello world!")
> print(getLocalProperty("spark.job.description"))
[1] "Hello world!"
> setLocalProperty("spark.job.description", "Hi !!")
> print(getLocalProperty("spark.job.description"))
[1] "Hi !!"
```

## How was this patch tested?

Manually tested and a test in `R/pkg/tests/fulltests/test_context.R`.

Author: hyukjinkwon <[email protected]>

Closes #20075 from HyukjinKwon/SPARK-21208.
## What changes were proposed in this pull request?

I found this problem while auditing the analyzer code. It's dangerous to introduce extra `AnalysisBarrer` during analysis, as the plan inside it will bypass all analysis afterward, which may not be expected. We should only preserve `AnalysisBarrer` but not introduce new ones.

## How was this patch tested?

existing tests

Author: Wenchen Fan <[email protected]>

Closes #20094 from cloud-fan/barrier.
…l columns

## What changes were proposed in this pull request?

For empty/null column, the result of `ApproximatePercentile` is null. Then in `ApproxCountDistinctForIntervals`, a `MatchError` (for `endpoints`) will be thrown if we try to generate histogram for that column. Besides, there is no need to generate histogram for such column. In this patch, we exclude such column when generating histogram.

## How was this patch tested?

Enhanced test cases for empty/null columns.

Author: Zhenhua Wang <[email protected]>

Closes #20102 from wzhfy/no_record_hgm_bug.
## What changes were proposed in this pull request?

This PR cleans up a few Java linter errors for Apache Spark 2.3 release.

## How was this patch tested?

```bash
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```

We can see the result from [Travis CI](https://travis-ci.org/dongjoon-hyun/spark/builds/322470787), too.

Author: Dongjoon Hyun <[email protected]>

Closes #20101 from dongjoon-hyun/fix-java-lint.
## What changes were proposed in this pull request?

Add tarLongFileMode=posix configuration for the assembly plugin

## How was this patch tested?

Reran build successfully
```
./build/mvn package -Pbigtop-dist -DskipTests -rf :spark-assembly_2.11
[INFO] Spark Project Assembly ............................. SUCCESS [ 23.082 s]
```

Author: Gera Shegalov <[email protected]>

Closes #20055 from gerashegalov/gera/tarLongFileMode.
Port code from the old executors listener to the new one, so that
the driver logs present in the application start event are kept.

Author: Marcelo Vanzin <[email protected]>

Closes #20038 from vanzin/SPARK-22836.
felixcheung and others added 26 commits January 3, 2018 21:43
…rigger, partitionBy

## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <[email protected]>

Closes #20129 from felixcheung/rwater.
## What changes were proposed in this pull request?

ChildFirstClassLoader's parent is set to null, so we can't get jars from its parent. This will cause ClassNotFoundException during HiveClient initialization with builtin hive jars, where we may should use spark context loader instead.

## How was this patch tested?

add new ut
cc cloud-fan gatorsmile

Author: Kent Yao <[email protected]>

Closes #20145 from yaooqinn/SPARK-22950.
## What changes were proposed in this pull request?

Currently Scala users can use UDF like
```
val foo = udf((i: Int) => Math.random() + i).asNondeterministic
df.select(foo('a))
```
Python users can also do it with similar APIs. However Java users can't do it, we should add Java UDF APIs in the functions object.

## How was this patch tested?

new tests

Author: Wenchen Fan <[email protected]>

Closes #20141 from cloud-fan/udf.
## What changes were proposed in this pull request?
```Python
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()
spark.catalog.registerFunction("random_udf", random_udf, StringType())
spark.sql("SELECT random_udf()").collect()
```

We will get the following error.
```
Py4JError: An error occurred while calling o29.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
```

This PR is to support it.

## How was this patch tested?
WIP

Author: gatorsmile <[email protected]>

Closes #20137 from gatorsmile/registerFunction.
…tDataTypes

## What changes were proposed in this pull request?
This pr is a follow-up to fix a bug left in #19977.

## How was this patch tested?
Added tests in `StringExpressionsSuite`.

Author: Takeshi Yamamuro <[email protected]>

Closes #20149 from maropu/SPARK-22771-FOLLOWUP.
…ternal shuffle service

## What changes were proposed in this pull request?

This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code.

## How was this patch tested?

Existing tests.

Author: jerryshao <[email protected]>

Closes #20144 from jerryshao/SPARK-21475-v2.
…ueues.

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

   bus.post(someEvent)
   bus.addToEventLogQueue(listener)
   bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue, because the queue wouldn't exist when the
event was posted.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.

Author: Marcelo Vanzin <[email protected]>

Closes #20039 from vanzin/SPARK-22850.
…container is used

## What changes were proposed in this pull request?

User-specified secrets are mounted into both the main container and init-container (when it is used) in a Spark driver/executor pod, using the `MountSecretsBootstrap`. Because `MountSecretsBootstrap` always adds new secret volumes for the secrets to the pod, the same secret volumes get added twice, one when mounting the secrets to the main container, and the other when mounting the secrets to the init-container. This PR fixes the issue by separating `MountSecretsBootstrap.mountSecrets` out into two methods: `addSecretVolumes` for adding secret volumes to a pod and `mountSecrets` for mounting secret volumes to a container, respectively. `addSecretVolumes` is only called once for each pod, whereas `mountSecrets` is called individually for the main container and the init-container (if it is used).

Ref: apache-spark-on-k8s#594.

## How was this patch tested?
Unit tested and manually tested.

vanzin This replaces #20148.
hex108 foxish kimoonkim

Author: Yinan Li <[email protected]>

Closes #20159 from liyinan926/master.
- Make it possible to build images from a git clone.
- Make it easy to use minikube to test things.

Also fixed what seemed like a bug: the base image wasn't getting the tag
provided in the command line. Adding the tag allows users to use multiple
Spark builds in the same kubernetes cluster.

Tested by deploying images on minikube and running spark-submit from a dev
environment; also by building the images with different tags and verifying
"docker images" in minikube.

Author: Marcelo Vanzin <[email protected]>

Closes #20154 from vanzin/SPARK-22960.
## What changes were proposed in this pull request?

32bit Int was used for row rank.
That overflowed in a dataframe with more than 2B rows.

## How was this patch tested?

Added test, but ignored, as it takes 4 minutes.

Author: Juliusz Sompolski <[email protected]>

Closes #20152 from juliuszsompolski/SPARK-22957.
## What changes were proposed in this pull request?
This pr fixed the issue when casting arrays into strings;
```
scala> val df = spark.range(10).select('id.cast("integer")).agg(collect_list('id).as('ids))
scala> df.write.saveAsTable("t")
scala> sql("SELECT cast(ids as String) FROM t").show(false)
+------------------------------------------------------------------+
|ids                                                               |
+------------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.UnsafeArrayData8bc285df|
+------------------------------------------------------------------+
```

This pr modified the result into;
```
+------------------------------+
|ids                           |
+------------------------------+
|[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]|
+------------------------------+
```

## How was this patch tested?
Added tests in `CastSuite` and `SQLQuerySuite`.

Author: Takeshi Yamamuro <[email protected]>

Closes #20024 from maropu/SPARK-22825.
… memory tradeoff for TrainValidationSplit

## What changes were proposed in this pull request?

Avoid holding all models in memory for `TrainValidationSplit`.

## How was this patch tested?

Existing tests.

Author: Bago Amirbekian <[email protected]>

Closes #20143 from MrBago/trainValidMemoryFix.
## What changes were proposed in this pull request?

We missed enabling `spark.files` and `spark.jars` in #19954. The result is that remote dependencies specified through `spark.files` or `spark.jars` are not included in the list of remote dependencies to be downloaded by the init-container. This PR fixes it.

## How was this patch tested?

Manual tests.

vanzin This replaces #20157.

foxish

Author: Yinan Li <[email protected]>

Closes #20160 from liyinan926/SPARK-22757.
…onstraints

## What changes were proposed in this pull request?

#19201 introduced the following regression: given something like `df.withColumn("c", lit(2))`, we're no longer picking up `c === 2` as a constraint and infer filters from it when joins are involved, which may lead to noticeable performance degradation.

This patch re-enables this optimization by picking up Aliases of Literals in Projection lists as constraints and making sure they're not treated as aliased columns.

## How was this patch tested?

Unit test was added.

Author: Adrian Ionescu <[email protected]>

Closes #20155 from adrian-ionescu/constant_constraints.
… platforms that don't have wget

## What changes were proposed in this pull request?

Modified HiveExternalCatalogVersionsSuite.scala to use Utils.doFetchFile to download different versions of Spark binaries rather than launching wget as an external process.

On platforms that don't have wget installed, this suite fails with an error.

cloud-fan : would you like to check this change?

## How was this patch tested?

1) test-only of HiveExternalCatalogVersionsSuite on several platforms. Tested bad mirror, read timeout, and redirects.
2) ./dev/run-tests

Author: Bruce Robbins <[email protected]>

Closes #20147 from bersprockets/SPARK-22940-alt.
## What changes were proposed in this pull request?

Follow-up cleanups for the OneHotEncoderEstimator PR.  See some discussion in the original PR: #19527 or read below for what this PR includes:
* configedCategorySize: I reverted this to return an Array.  I realized the original setup (which I had recommended in the original PR) caused the whole model to be serialized in the UDF.
* encoder: I reorganized the logic to show what I meant in the comment in the previous PR.  I think it's simpler but am open to suggestions.

I also made some small style cleanups based on IntelliJ warnings.

## How was this patch tested?

Existing unit tests

Author: Joseph K. Bradley <[email protected]>

Closes #20132 from jkbradley/viirya-SPARK-13030.
## What changes were proposed in this pull request?

Register spark.history.ui.port as a known spark conf to be used in substitution expressions even if it's not set explicitly.

## How was this patch tested?

Added unit test to demonstrate the issue

Author: Gera Shegalov <[email protected]>
Author: Gera Shegalov <[email protected]>

Closes #20098 from gerashegalov/gera/register-SHS-port-conf.
## What changes were proposed in this pull request?
This pr modified `elt` to output binary for binary inputs.
`elt` in the current master always output data as a string. But, in some databases (e.g., MySQL), if all inputs are binary, `elt` also outputs binary (Also, this might be a small surprise).
This pr is related to #19977.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro <[email protected]>

Closes #20135 from maropu/SPARK-22937.
## What changes were proposed in this pull request?

This PR reverts the `ARG base_image` before `FROM` in the images of driver, executor, and init-container, introduced in #20154. The reason is Docker versions before 17.06 do not support this use (`ARG` before `FROM`).

## How was this patch tested?

Tested manually.

vanzin foxish kimoonkim

Author: Yinan Li <[email protected]>

Closes #20170 from liyinan926/master.
… for non-deterministic cases

## What changes were proposed in this pull request?

Add tests for using non deterministic UDFs in aggregate.

Update pandas_udf docstring w.r.t to determinism.

## How was this patch tested?
test_nondeterministic_udf_in_aggregate

Author: Li Jin <[email protected]>

Closes #20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
# What changes were proposed in this pull request?
1. Start HiveThriftServer2.
2. Connect to thriftserver through beeline.
3. Close the beeline.
4. repeat step2 and step 3 for many times.
we found there are many directories never be dropped under the path `hive.exec.local.scratchdir` and `hive.exec.scratchdir`, as we know the scratchdir has been added to deleteOnExit when it be created. So it means that the cache size of FileSystem `deleteOnExit` will keep increasing until JVM terminated.

In addition, we use `jmap -histo:live [PID]`
to printout the size of objects in HiveThriftServer2 Process, we can find the object `org.apache.spark.sql.hive.client.HiveClientImpl` and `org.apache.hadoop.hive.ql.session.SessionState` keep increasing even though we closed all the beeline connections, which may caused the leak of Memory.

# How was this patch tested?
manual tests

This PR follw-up the #19989

Author: zuotingbing <[email protected]>

Closes #20029 from zuotingbing/SPARK-22793.
…quetOptions', `parquet.compression` needs to be considered.

[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'ParquetOptions', `parquet.compression` needs to be considered.

## What changes were proposed in this pull request?
Since Hive 1.1, Hive allows users to set parquet compression codec via table-level properties parquet.compression. See the JIRA: https://issues.apache.org/jira/browse/HIVE-7858 . We do support orc.compression for ORC. Thus, for external users, it is more straightforward to support both. See the stackflow question: https://stackoverflow.com/questions/36941122/spark-sql-ignores-parquet-compression-propertie-specified-in-tblproperties
In Spark side, our table-level compression conf compression was added by #11464 since Spark 2.0.
We need to support both table-level conf. Users might also use session-level conf spark.sql.parquet.compression.codec. The priority rule will be like
If other compression codec configuration was found through hive or parquet, the precedence would be compression, parquet.compression, spark.sql.parquet.compression.codec. Acceptable values include: none, uncompressed, snappy, gzip, lzo.
The rule for Parquet is consistent with the ORC after the change.

Changes:
1.Increased acquiring 'compressionCodecClassName' from `parquet.compression`,and the precedence order is `compression`,`parquet.compression`,`spark.sql.parquet.compression.codec`, just like what we do in `OrcOptions`.

2.Change `spark.sql.parquet.compression.codec` to support "none".Actually in `ParquetOptions`,we do support "none" as equivalent to "uncompressed", but it does not allowed to configured to "none".

3.Change `compressionCode` to `compressionCodecClassName`.

## How was this patch tested?
Add test.

Author: fjh100456 <[email protected]>

Closes #20076 from fjh100456/ParquetOptionIssue.
…or wrapped UDF function

## What changes were proposed in this pull request?

This PR wraps the `asNondeterministic` attribute in the wrapped UDF function to set the docstring properly.

```python
from pyspark.sql.functions import udf
help(udf(lambda x: x).asNondeterministic)
```

Before:

```
Help on function <lambda> in module pyspark.sql.udf:

<lambda> lambda
(END
```

After:

```
Help on function asNondeterministic in module pyspark.sql.udf:

asNondeterministic()
    Updates UserDefinedFunction to nondeterministic.

    .. versionadded:: 2.3
(END)
```

## How was this patch tested?

Manually tested and a simple test was added.

Author: hyukjinkwon <[email protected]>

Closes #20173 from HyukjinKwon/SPARK-22901-followup.
## What changes were proposed in this pull request?
This PR is to fix the  style checking failure.

## How was this patch tested?
N/A

Author: gatorsmile <[email protected]>

Closes #20175 from gatorsmile/stylefix.
## What changes were proposed in this pull request?
This pr fixed the issue when casting maps into strings;
```
scala> Seq(Map(1 -> "a", 2 -> "b")).toDF("a").write.saveAsTable("t")
scala> sql("SELECT cast(a as String) FROM t").show(false)
+----------------------------------------------------------------+
|a                                                               |
+----------------------------------------------------------------+
|org.apache.spark.sql.catalyst.expressions.UnsafeMapData38bdd75d|
+----------------------------------------------------------------+
```
This pr modified the result into;
```
+----------------+
|a               |
+----------------+
|[1 -> a, 2 -> b]|
+----------------+
```

## How was this patch tested?
Added tests in `CastSuite`.

Author: Takeshi Yamamuro <[email protected]>

Closes #20166 from maropu/SPARK-22973.
@fjh100456 fjh100456 merged commit 516c0a1 into fjh100456:master Jan 8, 2018
fjh100456 added a commit that referenced this pull request Jan 11, 2018
Merge from apache/spark
fjh100456 pushed a commit that referenced this pull request Apr 20, 2018
## What changes were proposed in this pull request?

There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` ([Fix #1](apache@c8803c0),
 [Fix #2](apache@86174ea)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <[email protected]>

Closes apache#20302 from brkyvz/json-invfix.
fjh100456 pushed a commit that referenced this pull request Apr 20, 2018
## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations.
```
val df = input.toDF
val join =
      df.select('value % 5 as "key", 'value).join(
        df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- StreamingExecutionRelation Memory[#1], value#12  // two different leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]       // both value#1 and value#12 replaces by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
      +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, incorrect join results
      +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   :     +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- Project [value#66 AS value#12]    // solution: project with aliases
         +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das <[email protected]>

Closes apache#20598 from tdas/SPARK-23406.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.