Skip to content
Prev Previous commit
Next Next commit
JSON data source refactor initial draft
  • Loading branch information
Yanbo Liang committed Feb 15, 2015
commit d1d4ed13eda20655836bcadd116a2553ae121d58
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[sql] class DefaultSource
case SaveMode.Append =>
sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
case SaveMode.Overwrite =>
fs.delete(filesystemPath, true)
//fs.delete(filesystemPath, true)
true
case SaveMode.ErrorIfExists =>
sys.error(s"path $path already exists.")
Expand All @@ -76,12 +76,18 @@ private[sql] class DefaultSource
} else {
true
}
if (doSave) {
val relation = if (doSave) {
// Only save data when the save mode is not ignore.
data.toJSON.saveAsTextFile(path)
//data.toJSON.saveAsTextFile(path)
val createdRelation = createRelation(sqlContext,parameters, data.schema)
createdRelation.asInstanceOf[JSONRelation].insert(data, true)

createdRelation
} else {
createRelation(sqlContext, parameters, data.schema)
}

createRelation(sqlContext, parameters, data.schema)
relation
}
}

Expand All @@ -92,7 +98,8 @@ private[sql] case class JSONRelation(
@transient val sqlContext: SQLContext)
extends TableScan with InsertableRelation {
// TODO: Support partitioned JSON relation.
private def baseRDD = sqlContext.sparkContext.textFile(path)
val filePath = new Path(path,"*").toUri.toString
private def baseRDD = sqlContext.sparkContext.textFile(filePath)

override val schema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
Expand All @@ -104,21 +111,38 @@ private[sql] case class JSONRelation(
override def buildScan() =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)

private def isTemporaryFile(file: Path): Boolean = {
file.getName == "_temporary"
}

override def insert(data: DataFrame, overwrite: Boolean) = {

val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)

// If the path exists, it must be a directory.
// Otherwise we create a directory with the path name.
if (fs.exists(filesystemPath) && !fs.getFileStatus(filesystemPath).isDirectory) {
sys.error("a CREATE [TEMPORARY] TABLE AS SELECT statement need the path must be directory")
}

if (overwrite) {
val temporaryPath = new Path(path, "_temporary")
val dataPath = new Path(path, "data")
// Write the data.
data.toJSON.saveAsTextFile(temporaryPath.toUri.toString)
val pathsToDelete = fs.listStatus(filesystemPath).filter(
f => !isTemporaryFile(f.getPath)).map(_.getPath)

try {
fs.delete(filesystemPath, true)
pathsToDelete.foreach(fs.delete(_,true))
} catch {
case e: IOException =>
throw new IOException(
s"Unable to clear output directory ${filesystemPath.toString} prior"
+ s" to INSERT OVERWRITE a JSON table:\n${e.toString}")
s"Unable to delete original data in directory ${filesystemPath.toString} when"
+ s" run INSERT OVERWRITE a JSON table:\n${e.toString}")
}
// Write the data.
data.toJSON.saveAsTextFile(path)
fs.rename(temporaryPath,dataPath)
// Right now, we assume that the schema is not changed. We will not update the schema.
// schema = data.schema
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
import caseInsensisitiveContext._

var path: File = null
var existPath: File = null

override def beforeAll(): Unit = {
path = util.getTempFilePath("jsonCTAS").getCanonicalFile
existPath = util.getTempFilePath("existJsonCTAS").getCanonicalFile
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
rdd.saveAsTextFile(existPath.toURI.toString)
jsonRDD(rdd).registerTempTable("jt")
}

Expand Down Expand Up @@ -62,6 +65,34 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
dropTempTable("jsonTable")
}

test("INSERT OVERWRITE with the source and destination point to the same table") {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable1
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${existPath.toString}'
|)
""".stripMargin)

sql(
s"""
|CREATE TEMPORARY TABLE jsonTable2
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${existPath.toString}'
|) AS
|SELECT a, b FROM jsonTable1
""".stripMargin)

checkAnswer(
sql("SELECT a, b FROM jsonTable2"),
sql("SELECT a, b FROM jt").collect())

dropTempTable("jsonTable1")
dropTempTable("jsonTable2")
}

test("create a table, drop it and create another one with the same name") {
sql(
s"""
Expand Down