Skip to content

Commit 9ebb3f2

Browse files
committed
Merge branch 'master' into barrier-udf
2 parents 9be4cad + fa9b6c3 commit 9ebb3f2

File tree

121 files changed

+2600
-640
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+2600
-640
lines changed

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,9 @@ class DataFrameReader private[sql] (sparkSession: SparkSession) extends Logging
458458
*/
459459
def table(tableName: String): DataFrame = {
460460
sparkSession.newDataFrame { builder =>
461-
builder.getReadBuilder.getNamedTableBuilder.setUnparsedIdentifier(tableName)
461+
builder.getReadBuilder.getNamedTableBuilder
462+
.setUnparsedIdentifier(tableName)
463+
.putAllOptions(extraOptions.toMap.asJava)
462464
}
463465
}
464466

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
2828
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder, ProductEncoder, StringEncoder, UnboundRowEncoder}
2929
import org.apache.spark.sql.catalyst.expressions.RowOrdering
3030
import org.apache.spark.sql.connect.client.SparkResult
31-
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
31+
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, StorageLevelProtoConverter}
3232
import org.apache.spark.sql.functions.{struct, to_json}
3333
import org.apache.spark.sql.types.{Metadata, StructType}
3434
import org.apache.spark.storage.StorageLevel
@@ -2771,22 +2771,86 @@ class Dataset[T] private[sql] (
27712771
new DataFrameWriterV2[T](table, this)
27722772
}
27732773

2774+
/**
2775+
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
2776+
*
2777+
* @group basic
2778+
* @since 3.4.0
2779+
*/
27742780
def persist(): this.type = {
2775-
throw new UnsupportedOperationException("persist is not implemented.")
2781+
sparkSession.analyze { builder =>
2782+
builder.getPersistBuilder.setRelation(plan.getRoot)
2783+
}
2784+
this
27762785
}
27772786

2787+
/**
2788+
* Persist this Dataset with the given storage level.
2789+
*
2790+
* @param newLevel
2791+
* One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, `MEMORY_AND_DISK_SER`,
2792+
* `DISK_ONLY`, `MEMORY_ONLY_2`, `MEMORY_AND_DISK_2`, etc.
2793+
* @group basic
2794+
* @since 3.4.0
2795+
*/
27782796
def persist(newLevel: StorageLevel): this.type = {
2779-
throw new UnsupportedOperationException("persist is not implemented.")
2797+
sparkSession.analyze { builder =>
2798+
builder.getPersistBuilder
2799+
.setRelation(plan.getRoot)
2800+
.setStorageLevel(StorageLevelProtoConverter.toConnectProtoType(newLevel))
2801+
}
2802+
this
27802803
}
27812804

2805+
/**
2806+
* Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. This
2807+
* will not un-persist any cached data that is built upon this Dataset.
2808+
*
2809+
* @param blocking
2810+
* Whether to block until all blocks are deleted.
2811+
* @group basic
2812+
* @since 3.4.0
2813+
*/
27822814
def unpersist(blocking: Boolean): this.type = {
2783-
throw new UnsupportedOperationException("unpersist() is not implemented.")
2815+
sparkSession.analyze { builder =>
2816+
builder.getUnpersistBuilder
2817+
.setRelation(plan.getRoot)
2818+
.setBlocking(blocking)
2819+
}
2820+
this
27842821
}
27852822

2823+
/**
2824+
* Mark the Dataset as non-persistent, and remove all blocks for it from memory and disk. This
2825+
* will not un-persist any cached data that is built upon this Dataset.
2826+
*
2827+
* @group basic
2828+
* @since 3.4.0
2829+
*/
27862830
def unpersist(): this.type = unpersist(blocking = false)
27872831

2788-
def cache(): this.type = {
2789-
throw new UnsupportedOperationException("cache() is not implemented.")
2832+
/**
2833+
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
2834+
*
2835+
* @group basic
2836+
* @since 3.4.0
2837+
*/
2838+
def cache(): this.type = persist()
2839+
2840+
/**
2841+
* Get the Dataset's current storage level, or StorageLevel.NONE if not persisted.
2842+
*
2843+
* @group basic
2844+
* @since 3.4.0
2845+
*/
2846+
def storageLevel: StorageLevel = {
2847+
StorageLevelProtoConverter.toStorageLevel(
2848+
sparkSession
2849+
.analyze { builder =>
2850+
builder.getGetStorageLevelBuilder.setRelation(plan.getRoot)
2851+
}
2852+
.getGetStorageLevel
2853+
.getStorageLevel)
27902854
}
27912855

27922856
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = {

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,13 @@ class SparkSession private[sql] (
400400
client.analyze(method, Some(plan), explainMode)
401401
}
402402

403+
private[sql] def analyze(
404+
f: proto.AnalyzePlanRequest.Builder => Unit): proto.AnalyzePlanResponse = {
405+
val builder = proto.AnalyzePlanRequest.newBuilder()
406+
f(builder)
407+
client.analyze(builder)
408+
}
409+
403410
private[sql] def sameSemantics(plan: proto.Plan, otherPlan: proto.Plan): Boolean = {
404411
client.sameSemantics(plan, otherPlan).getSameSemantics.getResult
405412
}

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ private[sql] class SparkConnectClient(
166166
analyze(builder)
167167
}
168168

169-
private def analyze(builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = {
169+
private[sql] def analyze(
170+
builder: proto.AnalyzePlanRequest.Builder): proto.AnalyzePlanResponse = {
170171
val request = builder
171172
.setUserContext(userContext)
172173
.setSessionId(sessionId)

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class PlanGenerationTestSuite
6969
// Borrowed from SparkFunSuite
7070
private val regenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
7171

72-
protected val queryFilePath: Path = commonResourcePath.resolve("queries")
72+
protected val queryFilePath: Path = commonResourcePath.resolve("query-tests/queries")
7373

7474
// A relative path to /connector/connect/server, used by `ProtoToParsedPlanTestSuite` to run
7575
// with the datasource.
@@ -2162,4 +2162,9 @@ class PlanGenerationTestSuite
21622162
test("replace") {
21632163
simple.na.replace[Long]("id", Map(1L -> 8L))
21642164
}
2165+
2166+
/* Reader API */
2167+
test("table API with options") {
2168+
session.read.options(Map("p1" -> "v1", "p2" -> "v2")).table("tempdb.myTable")
2169+
}
21652170
}

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
7575
}
7676

7777
private val CHUNK_SIZE: Int = 32 * 1024
78-
protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests")
78+
protected def artifactFilePath: Path = commonResourcePath.resolve("artifact-tests")
7979
protected def artifactCrcPath: Path = artifactFilePath.resolve("crc")
8080

8181
private def getCrcValues(filePath: Path): Seq[Long] = {

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ object CheckConnectJvmClientCompatibility {
160160
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.flatMap"),
161161
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreach"),
162162
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.foreachPartition"),
163-
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.storageLevel"),
164163
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.rdd"),
165164
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.toJavaRDD"),
166165
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.javaRDD"),

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/util/ConnectFunSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ trait ConnectFunSuite extends AnyFunSuite { // scalastyle:ignore funsuite
5252
"common",
5353
"src",
5454
"test",
55-
"resources",
56-
"query-tests").toAbsolutePath
55+
"resources").toAbsolutePath
5756
}
5857
}

connector/connect/common/src/main/protobuf/spark/connect/base.proto

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package spark.connect;
2121

2222
import "google/protobuf/any.proto";
2323
import "spark/connect/commands.proto";
24+
import "spark/connect/common.proto";
2425
import "spark/connect/expressions.proto";
2526
import "spark/connect/relations.proto";
2627
import "spark/connect/types.proto";
@@ -54,20 +55,6 @@ message UserContext {
5455
repeated google.protobuf.Any extensions = 999;
5556
}
5657

57-
// StorageLevel for persisting Datasets/Tables.
58-
message StorageLevel {
59-
// (Required) Whether the cache should use disk or not.
60-
bool use_disk = 1;
61-
// (Required) Whether the cache should use memory or not.
62-
bool use_memory = 2;
63-
// (Required) Whether the cache should use off-heap or not.
64-
bool use_off_heap = 3;
65-
// (Required) Whether the cached data is deserialized or not.
66-
bool deserialized = 4;
67-
// (Required) The number of replicas.
68-
int32 replication = 5;
69-
}
70-
7158
// Request to perform plan analyze, optionally to explain the plan.
7259
message AnalyzePlanRequest {
7360
// (Required)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
syntax = 'proto3';
19+
20+
package spark.connect;
21+
22+
option java_multiple_files = true;
23+
option java_package = "org.apache.spark.connect.proto";
24+
25+
// StorageLevel for persisting Datasets/Tables.
26+
message StorageLevel {
27+
// (Required) Whether the cache should use disk or not.
28+
bool use_disk = 1;
29+
// (Required) Whether the cache should use memory or not.
30+
bool use_memory = 2;
31+
// (Required) Whether the cache should use off-heap or not.
32+
bool use_off_heap = 3;
33+
// (Required) Whether the cached data is deserialized or not.
34+
bool deserialized = 4;
35+
// (Required) The number of replicas.
36+
int32 replication = 5;
37+
}

0 commit comments

Comments
 (0)