Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Address style checking by spark-csv Travis CI.
  • Loading branch information
rayortigas committed Apr 19, 2015
commit 2649026105611f3ddf608eab2ac5c8f17ff31f39
28 changes: 23 additions & 5 deletions src/main/scala/com/databricks/spark/csv/rdd/package.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2014 Databricks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.databricks.spark.csv

import org.apache.spark.rdd.RDD
Expand All @@ -15,14 +30,17 @@ package object rdd {
escape: Char = '\\',
mode: String = "DROPMALFORMED"): RDD[T] = {

if (mode == util.ParseModes.PERMISSIVE_MODE)
if (mode == util.ParseModes.PERMISSIVE_MODE) {
throw new IllegalArgumentException(s"permissive mode is invalid for this method")
}

val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
if (schema.exists { structField => !structField.dataType.isPrimitive })
if (schema.exists { structField => !structField.dataType.isPrimitive }) {
throw new IllegalArgumentException(s"type must be a case class with only primitive fields")
}

val df = new CsvContext(sqlContext).csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema))
val csvContext = new CsvContext(sqlContext)
val df = csvContext.csvFile(filePath, useHeader, delimiter, quote, escape, mode, Some(schema))
df.mapPartitions[T] { iter =>
val rowConverter = RowConverter[T]()
iter.map { row => rowConverter.convert(row) }
Expand All @@ -38,7 +56,7 @@ package object rdd {
}

case class RowConverter[T]()(implicit ct: scala.reflect.ClassTag[T]) {
// http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html#types-of-mirrors-their-use-cases--examples
// http://docs.scala-lang.org/overviews/reflection/environment-universes-mirrors.html

// For Scala 2.10, because we're initializing the runtime universe, this is not thread-safe.
// http://docs.scala-lang.org/overviews/reflection/thread-safety.html
Expand All @@ -56,4 +74,4 @@ package object rdd {
constructorMirror.apply(args: _*).asInstanceOf[T]
}
}
}
}