From e252d7561c0b74461dd4d321b55ce043f36946ab Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Tue, 23 Feb 2021 16:05:58 +0800 Subject: [PATCH 1/3] fix the remaining problems in SPARK-34432. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. we don't need to implement SessionConfigSupport in simple writable table data source tests. remove it 2. change the schema of `SimpleWritableDataSource`, to match `TestingV2Source` --- .../sql/connector/JavaSimpleWritableDataSource.java | 8 +------- .../sql/connector/SimpleWritableDataSource.scala | 12 +++++------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java index b588d4c06e6e..74140d707c25 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaSimpleWritableDataSource.java @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.connector.SimpleCounter; import org.apache.spark.sql.connector.TestingV2Source; -import org.apache.spark.sql.connector.catalog.SessionConfigSupport; import org.apache.spark.sql.connector.catalog.SupportsWrite; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCapability; @@ -50,12 +49,7 @@ * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ -public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport { - - @Override - public String keyPrefix() { - return "javaSimpleWritableDataSource"; - } +public class JavaSimpleWritableDataSource implements TestingV2Source { static class MyScanBuilder extends JavaSimpleScanBuilder { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index f9306ba28e7f..95b84aa82345 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, ScanBuilder} import org.apache.spark.sql.connector.write._ @@ -41,11 +41,9 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ -class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSupport { +class SimpleWritableDataSource extends SimpleTableProvider { - private val tableSchema = new StructType().add("i", "long").add("j", "long") - - override def keyPrefix: String = "simpleWritableDataSource" + private val tableSchema = new StructType().add("i", "int").add("j", "int") class MyScanBuilder(path: String, conf: Configuration) extends SimpleScanBuilder { override def planInputPartitions(): Array[InputPartition] = { @@ -179,7 +177,7 @@ class CSVReaderFactory(conf: SerializableConfiguration) } } - override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toLong): _*) + override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toInt): _*) override def close(): Unit = { inputStream.close() @@ -222,7 +220,7 @@ class CSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[InternalRow] private val out = fs.create(file) override def write(record: InternalRow): Unit = { - out.writeBytes(s"${record.getLong(0)},${record.getLong(1)}\n") + out.writeBytes(s"${record.getInt(0)},${record.getInt(1)}\n") } override def commit(): WriterCommitMessage = { From 7f82afcf7c162dcde20c04222752d2d8d94a4459 Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Mon, 1 Mar 2021 19:39:03 +0800 Subject: [PATCH 2/3] extend TestingV2Source instead of SimpleTableProvider --- .../spark/sql/connector/SimpleWritableDataSource.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index 95b84aa82345..c093349a3ecd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapabi import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, ScanBuilder} import org.apache.spark.sql.connector.write._ -import org.apache.spark.sql.internal.connector.SimpleTableProvider import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -41,9 +40,7 @@ import org.apache.spark.util.SerializableConfiguration * Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`. * Each job moves files from `target/_temporary/uniqueId/` to `target`. */ -class SimpleWritableDataSource extends SimpleTableProvider { - - private val tableSchema = new StructType().add("i", "int").add("j", "int") +class SimpleWritableDataSource extends TestingV2Source { class MyScanBuilder(path: String, conf: Configuration) extends SimpleScanBuilder { override def planInputPartitions(): Array[InputPartition] = { @@ -66,7 +63,7 @@ class SimpleWritableDataSource extends SimpleTableProvider { new CSVReaderFactory(serializableConf) } - override def readSchema(): StructType = tableSchema + override def readSchema(): StructType = TestingV2Source.schema } class MyWriteBuilder(path: String, info: LogicalWriteInfo) @@ -132,7 +129,7 @@ class SimpleWritableDataSource extends SimpleTableProvider { private val path = options.get("path") private val conf = SparkContext.getActive.get.hadoopConfiguration - override def schema(): StructType = tableSchema + override def schema(): StructType = TestingV2Source.schema override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder(new Path(path).toUri.toString, conf) From 6c38617eb430e56acc40518760339cf73c86af0e Mon Sep 17 00:00:00 2001 From: kevincmchen Date: Tue, 2 Mar 2021 15:02:37 +0800 Subject: [PATCH 3/3] it's unnecessary to override readSchema in MyScanBuilder and schema in MyTable, so delete these. --- .../spark/sql/connector/SimpleWritableDataSource.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala index c093349a3ecd..065ba4caebf3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SimpleWritableDataSource.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapabi import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, ScanBuilder} import org.apache.spark.sql.connector.write._ -import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration @@ -62,8 +61,6 @@ class SimpleWritableDataSource extends TestingV2Source { val serializableConf = new SerializableConfiguration(conf) new CSVReaderFactory(serializableConf) } - - override def readSchema(): StructType = TestingV2Source.schema } class MyWriteBuilder(path: String, info: LogicalWriteInfo) @@ -129,8 +126,6 @@ class SimpleWritableDataSource extends TestingV2Source { private val path = options.get("path") private val conf = SparkContext.getActive.get.hadoopConfiguration - override def schema(): StructType = TestingV2Source.schema - override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder(new Path(path).toUri.toString, conf) }