Skip to content

Commit 96e5972

Browse files
xingchaozhwangyum
authored andcommitted
[CARMEL-3172] Upload Support (#62)
* [CARMEL-3172] Upload Support * Fix code style * fix ut * fix ut
1 parent 4b05d88 commit 96e5972

File tree

15 files changed

+764
-10
lines changed

15 files changed

+764
-10
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ statement
227227
| CLEAR CACHE #clearCache
228228
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
229229
multipartIdentifier partitionSpec? #loadData
230+
| UPLOAD DATA INPATH path=STRING OVERWRITE? INTO TABLE
231+
tableIdentifier partitionSpec? optionSpec? #uploadData
230232
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
231233
| MSCK REPAIR TABLE multipartIdentifier #repairTable
232234
| op=(ADD | LIST) identifier (STRING | .*?) #manageResource
@@ -254,6 +256,13 @@ statement
254256
| unsupportedHiveNativeCommands .*? #failNativeCommand
255257
;
256258

259+
optionSpec
260+
: OPTION '(' optionVal (',' optionVal)* ')'
261+
;
262+
263+
optionVal
264+
: identifier (EQ constant)?
265+
;
257266

258267
privilegeList
259268
: privilegeDef (',' privilegeDef)*
@@ -1248,6 +1257,7 @@ ansiNonReserved
12481257
| UNLOCK
12491258
| UNSET
12501259
| UPDATE
1260+
| UPLOAD
12511261
| USE
12521262
| VALUES
12531263
| VIEW
@@ -1789,6 +1799,7 @@ UNKNOWN: 'UNKNOWN';
17891799
UNLOCK: 'UNLOCK';
17901800
UNSET: 'UNSET';
17911801
UPDATE: 'UPDATE';
1802+
UPLOAD: 'UPLOAD';
17921803
USE: 'USE';
17931804
USER: 'USER';
17941805
USING: 'USING';

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,31 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
529529
}
530530
}
531531

532+
/**
533+
* Create a option specification map.
534+
*/
535+
override def visitOptionSpec(
536+
ctx: OptionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
537+
val opts = ctx.optionVal.asScala.map { pVal =>
538+
val name = pVal.identifier.getText
539+
val value = Option(pVal.constant).map(visitStringConstant)
540+
name -> value
541+
}
542+
checkDuplicateKeys(opts, ctx)
543+
opts.toMap
544+
}
545+
546+
/**
547+
* Create a option specification map without optional values.
548+
*/
549+
protected def visitNonOptionalOptionSpec(
550+
ctx: OptionSpecContext): Map[String, String] = withOrigin(ctx) {
551+
visitOptionSpec(ctx).map {
552+
case (key, None) => throw new ParseException(s"Found an empty option key '$key'.", ctx)
553+
case (key, Some(value)) => key -> value
554+
}
555+
}
556+
532557
/**
533558
* Convert a constant of any type into a string. This is typically used in DDL commands, and its
534559
* main purpose is to prevent slight differences due to back to back conversions i.e.:
@@ -3310,6 +3335,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
33103335
)
33113336
}
33123337

3338+
override def visitUploadData(ctx: UploadDataContext): LogicalPlan = withOrigin(ctx) {
3339+
if (!conf.hiveThriftServerDataUploadEnabled) {
3340+
throw new ParseException(s"UPLOAD DATA is not supported", ctx)
3341+
}
3342+
UploadDataStatement(
3343+
table = visitTableIdentifier(ctx.tableIdentifier),
3344+
path = string(ctx.path),
3345+
isOverwrite = ctx.OVERWRITE != null,
3346+
partitionSpec = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec),
3347+
optionSpec = Option(ctx.optionSpec()).map(visitNonOptionalOptionSpec)
3348+
)
3349+
}
3350+
33133351
/**
33143352
* Creates a [[ShowCreateTableStatement]]
33153353
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,16 @@ case class LoadDataStatement(
390390
isOverwrite: Boolean,
391391
partition: Option[TablePartitionSpec]) extends ParsedStatement
392392

393+
/**
394+
* A UPLOAD DATA INTO TABLE statement, as parsed from SQL
395+
*/
396+
case class UploadDataStatement(
397+
table: TableIdentifier,
398+
path: String,
399+
isOverwrite: Boolean,
400+
partitionSpec: Option[TablePartitionSpec],
401+
optionSpec: Option[Map[String, String]]) extends ParsedStatement
402+
393403
/**
394404
* A SHOW CREATE TABLE statement, as parsed from SQL.
395405
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3348,6 +3348,24 @@ class SQLConf extends Serializable with Logging {
33483348
def hiveThriftServerEventLogDir: String =
33493349
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_EVENTLOG_DIR)
33503350

3351+
def hiveThriftServerDataUploadEnabled: Boolean =
3352+
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_DATA_UPLOAD_ENABLED)
3353+
3354+
def hiveThriftServerDataUploadDynamicPartitionEnabled: Boolean =
3355+
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_DATA_UPLOAD_DYNAMIC_PARTITION_ENABLED)
3356+
3357+
def hiveThriftServerDataUploadTemporaryBaseDir: String =
3358+
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_DATA_UPLOAD_TEMPORARY_BASE_DIR)
3359+
3360+
def hiveThriftServerDataUploadTemporaryFileMaxSize: Long =
3361+
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_DATA_UPLOAD_TEMPORARY_FILE_MAX_SIZE)
3362+
3363+
def hiveThriftServerDataUploadWorkspaceBaseDir: String =
3364+
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_DATA_UPLOAD_WORKSPACE_BASE_DIR)
3365+
3366+
def hiveThriftServerDataUploadNotAllowedCsvOptions: String =
3367+
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_DATA_UPLOAD_NOT_ALLOWED_CSV_OPTIONS)
3368+
33513369
def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
33523370

33533371
def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,48 @@ object StaticSQLConf {
132132
.doc("Path to record event log.")
133133
.stringConf
134134
.createWithDefault("/tmp/spark-events")
135-
136135
val HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE =
137136
buildStaticConf("spark.sql.hive.thriftServer.data.download.max.size")
138137
.doc("The maximum data size allowed downloaded from Thrift server.")
139138
.bytesConf(ByteUnit.BYTE)
140139
.createWithDefault(100L * 1024 * 1024 * 1024) // 100G
141140

141+
val HIVE_THRIFT_SERVER_DATA_UPLOAD_ENABLED =
142+
buildStaticConf("spark.sql.hive.thriftServer.data.upload.enabled")
143+
.doc("Enable data upload")
144+
.booleanConf
145+
.createWithDefault(true)
146+
147+
val HIVE_THRIFT_SERVER_DATA_UPLOAD_DYNAMIC_PARTITION_ENABLED =
148+
buildStaticConf("spark.sql.hive.thriftServer.data.upload.dynamic.partition.enabled")
149+
.doc("Enable data upload for dynamic partition insert")
150+
.booleanConf
151+
.createWithDefault(true)
152+
153+
val HIVE_THRIFT_SERVER_DATA_UPLOAD_TEMPORARY_BASE_DIR =
154+
buildStaticConf("spark.sql.hive.thriftServer.data.upload.temporary.base.dir")
155+
.doc("Temporary path for data uploading.")
156+
.stringConf
157+
.createWithDefault("/tmp/spark-uploads")
158+
159+
val HIVE_THRIFT_SERVER_DATA_UPLOAD_TEMPORARY_FILE_MAX_SIZE =
160+
buildStaticConf("spark.sql.hive.thriftServer.data.upload.temporary.file.max.size")
161+
.doc("The max size of file uploaded in bytes for data uploading.")
162+
.longConf
163+
.createWithDefault(200*1024*1024)
164+
165+
val HIVE_THRIFT_SERVER_DATA_UPLOAD_WORKSPACE_BASE_DIR =
166+
buildStaticConf("spark.sql.hive.thriftServer.data.upload.workspace.base.dir")
167+
.doc("The base directory of workspaces for data uploading.")
168+
.stringConf
169+
.createWithDefault("/tmp/spark-workspaces")
170+
171+
val HIVE_THRIFT_SERVER_DATA_UPLOAD_NOT_ALLOWED_CSV_OPTIONS =
172+
buildStaticConf("spark.sql.hive.thriftServer.data.upload.not.allowed.csv.options")
173+
.doc("The not allowed csv options for data uploading.")
174+
.stringConf
175+
.createWithDefault("path")
176+
142177
val SPARK_SESSION_EXTENSIONS = buildStaticConf("spark.sql.extensions")
143178
.doc("A comma-separated list of classes that implement " +
144179
"Function1[SparkSessionExtensions, Unit] used to configure Spark Session extensions. The " +

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.parser
2020
import java.util.Locale
2121

2222
import org.apache.spark.sql.AnalysisException
23+
import org.apache.spark.sql.catalyst.TableIdentifier
2324
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView}
2425
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, FunctionResourceType, JarResource}
2526
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal, PartitioningAttribute}
@@ -1557,6 +1558,33 @@ class DDLParserSuite extends AnalysisTest {
15571558
Some(Map("ds" -> "2017-06-10"))))
15581559
}
15591560

1561+
test("UPLOAD DATA INTO table") {
1562+
comparePlans(
1563+
parsePlan(
1564+
s"""
1565+
|UPLOAD DATA INPATH 'path' INTO TABLE table1
1566+
""".stripMargin),
1567+
UploadDataStatement(
1568+
new TableIdentifier("table1", None),
1569+
"path",
1570+
false,
1571+
None,
1572+
None))
1573+
1574+
comparePlans(
1575+
parsePlan(
1576+
s"""
1577+
|UPLOAD DATA INPATH 'path' OVERWRITE INTO
1578+
|TABLE table1 PARTITION(c='1', d='2') OPTION(e='3', f='true')
1579+
""".stripMargin),
1580+
UploadDataStatement(
1581+
new TableIdentifier("table1", None),
1582+
"path",
1583+
true,
1584+
Some(Map("c" -> "1", "d" -> "2")),
1585+
Some(Map("e" -> "3", "f" -> "true"))))
1586+
}
1587+
15601588
test("SHOW CREATE table") {
15611589
comparePlans(
15621590
parsePlan("SHOW CREATE TABLE a.b.c"),

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,14 @@ class ResolveSessionCatalog(
402402
isOverwrite,
403403
partition)
404404

405+
case UploadDataStatement(tbl, path, isOverwrite, partitionSpec, optionSpec) =>
406+
UploadDataCommand(
407+
tbl,
408+
path,
409+
isOverwrite,
410+
partitionSpec,
411+
optionSpec)
412+
405413
case ShowCreateTableStatement(tbl, asSerde) if !asSerde =>
406414
val name = parseTempViewOrV1Table(tbl, "SHOW CREATE TABLE")
407415
ShowCreateTableCommand(name.asTableIdentifier)

0 commit comments

Comments
 (0)