Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix the remaining problems in SPARK-34432.
1. we don't need to implement SessionConfigSupport in simple writable table data source tests. remove it
2. change the schema of `SimpleWritableDataSource`, to match `TestingV2Source`
  • Loading branch information
kevincmchen committed Feb 23, 2021
commit e252d7561c0b74461dd4d321b55ce043f36946ab
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.connector.SimpleCounter;
import org.apache.spark.sql.connector.TestingV2Source;
import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
Expand All @@ -50,12 +49,7 @@
* Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`.
* Each job moves files from `target/_temporary/uniqueId/` to `target`.
*/
public class JavaSimpleWritableDataSource implements TestingV2Source, SessionConfigSupport {

@Override
public String keyPrefix() {
return "javaSimpleWritableDataSource";
}
public class JavaSimpleWritableDataSource implements TestingV2Source {

static class MyScanBuilder extends JavaSimpleScanBuilder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory, ScanBuilder}
import org.apache.spark.sql.connector.write._
Expand All @@ -41,11 +41,9 @@ import org.apache.spark.util.SerializableConfiguration
* Each task writes data to `target/_temporary/uniqueId/$jobId-$partitionId-$attemptNumber`.
* Each job moves files from `target/_temporary/uniqueId/` to `target`.
*/
class SimpleWritableDataSource extends SimpleTableProvider with SessionConfigSupport {
class SimpleWritableDataSource extends SimpleTableProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extend TestingV2Source instead?

Copy link
Contributor Author

@kevincmchen kevincmchen Mar 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i‘ll fix it


private val tableSchema = new StructType().add("i", "long").add("j", "long")

override def keyPrefix: String = "simpleWritableDataSource"
private val tableSchema = new StructType().add("i", "int").add("j", "int")

class MyScanBuilder(path: String, conf: Configuration) extends SimpleScanBuilder {
override def planInputPartitions(): Array[InputPartition] = {
Expand Down Expand Up @@ -179,7 +177,7 @@ class CSVReaderFactory(conf: SerializableConfiguration)
}
}

override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toLong): _*)
override def get(): InternalRow = InternalRow(currentLine.split(",").map(_.trim.toInt): _*)

override def close(): Unit = {
inputStream.close()
Expand Down Expand Up @@ -222,7 +220,7 @@ class CSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[InternalRow]
private val out = fs.create(file)

override def write(record: InternalRow): Unit = {
out.writeBytes(s"${record.getLong(0)},${record.getLong(1)}\n")
out.writeBytes(s"${record.getInt(0)},${record.getInt(1)}\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change just matches the TestingV2Source.schema?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea

}

override def commit(): WriterCommitMessage = {
Expand Down