diff --git a/.gitignore b/.gitignore
index c58d83b..4a8e38c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,19 @@ project/plugins/project/
# Scala-IDE specific
.scala_dependencies
.worksheet
+.idea/
+
+# emacs stuff
+\#*\#
+\.\#*
+*~
+sbt/*launch*.jar
+
+# python
+*.pyc
+
+# native
+*.o
+*.so
+*.so.0.0.0
+*.so.0
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..58147d3
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,64 @@
+language: scala
+sudo: false
+cache:
+ directories:
+ - $HOME/.ivy2
+ - $HOME/spark
+ - $HOME/.cache/pip
+ - $HOME/.pip-cache
+ - $HOME/.sbt/launchers
+ - $HOME/perl5
+scala:
+ - 2.11.6
+jdk:
+ - oraclejdk8
+r:
+ - release
+addons:
+ apt:
+ sources:
+ - ubuntu-toolchain-r-test
+ - ppa:marutter/rdev
+ packages:
+ - gfortran
+ - gcc
+ - binutils
+ - python-pip
+ - python-pandas
+ - python-numpy
+ - gfortran
+ - cmake
+ - perl
+ - cpanminus
+ - r-base
+ - libcurl4-gnutls-dev
+ - libxml2-dev
+ - libssl-dev
+ - r-base-dev
+ - axel
+r_packages:
+ - Imap
+before_install:
+ - # Setup Python
+ - pip install --user codecov unittest2 nose pep8 pylint
+ - # Setup perl
+ - cpanm --force --local-lib $HOME/perl5 --quite --notest Pithub || cat ~/.cpanm/build.log
+ - cd ./src/main/perl; cpanm --local-lib $HOME/perl5 --force --quiet --installdeps --notest .; cd ../../../
+ - PATH="$HOME/perl5/bin${PATH:+:${PATH}}"; export PATH;
+ - PERL5LIB=":$HOME/perl5/lib/perl5${PERL5LIB:+:${PERL5LIB}}"; export PERL5LIB;
+ - PERL_LOCAL_LIB_ROOT="$HOME/perl5${PERL_LOCAL_LIB_ROOT:+:${PERL_LOCAL_LIB_ROOT}}"; export PERL_LOCAL_LIB_ROOT;
+ - PERL_MB_OPT="--install_base \"$HOME/perl5\""; export PERL_MB_OPT;
+ - PERL_MM_OPT="INSTALL_BASE=$HOME/perl5"; export PERL_MM_OPT;
+script:
+ - "export SPARK_CONF_DIR=./log4j/"
+ - sbt clean coverage compile package assembly test || (rm -rf ~/.ivy2 ~/.m2 && sbt clean coverage compile package test)
+ - "[ -f spark] || mkdir spark && cd spark && axel http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz && cd .."
+ - "tar -xf ./spark/spark-2.1.0-bin-hadoop2.7.tgz"
+ - "export SPARK_HOME=`pwd`/spark-2.1.0-bin-hadoop2.7"
+ - "export PYTHONPATH=$SPARK_HOME/python:`ls -1 $SPARK_HOME/python/lib/py4j-*-src.zip`:$PYTHONPATH"
+ - "PYSPARK_SUBMIT_ARGS='--jars ./target/examples-assembly-0.0.1.jar pyspark-shell' nosetests --with-doctest --doctest-options=+ELLIPSIS --logging-level=INFO --detailed-errors --verbosity=2 --with-coverage --cover-html-dir=./htmlcov"
+ - # $SPARK_HOME/bin/spark-submit ./src/main/r/wc.R $SPARK_HOME/README.md
+ - # $SPARK_HOME/bin/spark-submit ./src/main/r/dapply.R
+after_success:
+ - sbt coverageReport || sbt update coverageReport
+ - codecov
\ No newline at end of file
diff --git a/LICENSE b/LICENSE
index 8f71f43..80f405b 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,3 +1,6 @@
+Individual components under resources are available under their own licenses.
+ * MySQL connector is GPL
+The source code in this repo is available under the Apache License
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
diff --git a/README.md b/README.md
index a7f4184..551928f 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,10 @@
# high-performance-spark-examples
Examples for High Performance Spark
+
+# Building
+
+Most of the examples can be built with sbt, the C and Fortran components depend on gcc, g77, and cmake.
+
+# Tests
+
+The full test suite depends on having the C and Fortran components built as well as a local R installation available.
diff --git a/appveyor.yml b/appveyor.yml
new file mode 100644
index 0000000..d8be93f
--- /dev/null
+++ b/appveyor.yml
@@ -0,0 +1,27 @@
+version: '{build}'
+
+platform:
+ - x86
+ - x64
+
+environment:
+ matrix:
+ - JAVA_HOME: C:\Program Files\Java\jdk1.8.0
+
+
+install:
+- ps: Start-FileDownload 'http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.9/sbt-launch.jar'
+- xcopy sbt-launch.jar sbt\
+- del build.sbt
+- copy build_windows.sbt build.sbt
+
+build_script:
+- sbt\sbt clean compile
+
+test_script:
+- sbt\sbt "testOnly com.highperformancespark.examples.tools.FilterInvalidPandasSuite"
+
+cache:
+- C:\Users\appveyor\.ivy2
+- C:\Users\appveyor\.m2
+- C:\Users\appveyor\.sbt
\ No newline at end of file
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..35b1508
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,97 @@
+organization := "com.highperformancespark"
+
+name := "examples"
+
+publishMavenStyle := true
+
+version := "0.0.1"
+
+scalaVersion := "2.11.6"
+scalaVersion in ThisBuild := "2.11.6"
+ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
+
+crossScalaVersions := Seq("2.11.6")
+
+javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
+
+//tag::sparkVersion[]
+sparkVersion := "2.2.0"
+//end::sparkVersion[]
+
+//tag::sparkComponents[]
+sparkComponents ++= Seq("core")
+//end::sparkComponents[]
+//tag::sparkExtraComponents[]
+sparkComponents ++= Seq("streaming", "mllib")
+//end::sparkExtraComponents[]
+//tag::addSQLHiveComponent[]
+sparkComponents ++= Seq("sql", "hive", "hive-thriftserver", "hive-thriftserver")
+//end::addSQLHiveComponent[]
+
+parallelExecution in Test := false
+
+fork := true
+
+javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled", "-Djna.nosys=true")
+
+// additional libraries
+libraryDependencies ++= Seq(
+ "org.scalatest" %% "scalatest" % "3.0.1",
+ "org.scalacheck" %% "scalacheck" % "1.13.4",
+ "junit" % "junit" % "4.12",
+ "junit" % "junit" % "4.11",
+ "com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.7.2",
+ "com.novocode" % "junit-interface" % "0.11" % "test->default",
+ //tag::scalaLogging[]
+ "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0",
+ //end::scalaLogging[]
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.8.8",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.8.8",
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.9.13",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13",
+ "net.java.dev.jna" % "jna" % "4.2.2")
+
+
+scalacOptions ++= Seq("-deprecation", "-unchecked")
+
+pomIncludeRepository := { x => false }
+
+resolvers ++= Seq(
+ "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
+ "Spray Repository" at "http://repo.spray.cc/",
+ "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
+ "Akka Repository" at "http://repo.akka.io/releases/",
+ "Twitter4J Repository" at "http://twitter4j.org/maven2/",
+ "Apache HBase" at "https://repository.apache.org/content/repositories/releases",
+ "Twitter Maven Repo" at "http://maven.twttr.com/",
+ "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
+ "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
+ "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
+ "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
+ "Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
+ Resolver.sonatypeRepo("public"),
+ Resolver.bintrayRepo("jodersky", "sbt-jni-macros"),
+ "jodersky" at "https://dl.bintray.com/jodersky/maven/"
+)
+
+licenses := Seq("Apache License 2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.html"))
+
+mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
+ {
+ case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
+ case m if m.startsWith("META-INF") => MergeStrategy.discard
+ case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
+ case PathList("org", "apache", xs @ _*) => MergeStrategy.first
+ case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
+ case "log4j.properties" => MergeStrategy.discard
+ case "about.html" => MergeStrategy.rename
+ case "reference.conf" => MergeStrategy.concat
+ case _ => MergeStrategy.first
+ }
+}
+
+// JNI
+
+enablePlugins(JniNative)
+
+sourceDirectory in nativeCompile := sourceDirectory.value
diff --git a/build_windows.sbt b/build_windows.sbt
new file mode 100644
index 0000000..b698ab9
--- /dev/null
+++ b/build_windows.sbt
@@ -0,0 +1,91 @@
+organization := "com.highperformancespark"
+
+name := "examples"
+
+publishMavenStyle := true
+
+version := "0.0.1"
+
+scalaVersion := "2.11.6"
+scalaVersion in ThisBuild := "2.11.6"
+ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }
+
+crossScalaVersions := Seq("2.11.6")
+
+javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
+
+//tag::sparkVersion[]
+sparkVersion := "2.2.0"
+//end::sparkVersion[]
+
+//tag::sparkComponents[]
+sparkComponents ++= Seq("core")
+//end::sparkComponents[]
+//tag::sparkExtraComponents[]
+sparkComponents ++= Seq("streaming", "mllib")
+//end::sparkExtraComponents[]
+//tag::addSQLHiveComponent[]
+sparkComponents ++= Seq("sql", "hive", "hive-thriftserver", "hive-thriftserver")
+//end::addSQLHiveComponent[]
+
+parallelExecution in Test := false
+
+fork := true
+
+javaOptions ++= Seq("-Xms512M", "-Xmx2048M", "-XX:MaxPermSize=2048M", "-XX:+CMSClassUnloadingEnabled", "-Djna.nosys=true")
+
+// additional libraries
+libraryDependencies ++= Seq(
+ "org.scalatest" %% "scalatest" % "3.0.1",
+ "org.scalacheck" %% "scalacheck" % "1.13.4",
+ "junit" % "junit" % "4.12",
+ "junit" % "junit" % "4.11",
+ "com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.7.2",
+ "com.novocode" % "junit-interface" % "0.11" % "test->default",
+ //tag::sacalLogging[]
+ "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0",
+ //end::scalaLogging[]
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.8.8",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.8.8",
+ "org.codehaus.jackson" % "jackson-core-asl" % "1.9.13",
+ "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13",
+ "net.java.dev.jna" % "jna" % "4.2.2")
+
+
+scalacOptions ++= Seq("-deprecation", "-unchecked")
+
+pomIncludeRepository := { x => false }
+
+resolvers ++= Seq(
+ "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/",
+ "Spray Repository" at "http://repo.spray.cc/",
+ "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/",
+ "Akka Repository" at "http://repo.akka.io/releases/",
+ "Twitter4J Repository" at "http://twitter4j.org/maven2/",
+ "Apache HBase" at "https://repository.apache.org/content/repositories/releases",
+ "Twitter Maven Repo" at "http://maven.twttr.com/",
+ "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools",
+ "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/",
+ "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/",
+ "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/",
+ "Mesosphere Public Repository" at "http://downloads.mesosphere.io/maven",
+ Resolver.sonatypeRepo("public"),
+ Resolver.bintrayRepo("jodersky", "sbt-jni-macros"),
+ "jodersky" at "https://dl.bintray.com/jodersky/maven/"
+)
+
+licenses := Seq("Apache License 2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0.html"))
+
+mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
+ {
+ case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
+ case m if m.startsWith("META-INF") => MergeStrategy.discard
+ case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
+ case PathList("org", "apache", xs @ _*) => MergeStrategy.first
+ case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
+ case "log4j.properties" => MergeStrategy.discard
+ case "about.html" => MergeStrategy.rename
+ case "reference.conf" => MergeStrategy.concat
+ case _ => MergeStrategy.first
+ }
+}
diff --git a/conf/log4j.properties b/conf/log4j.properties
new file mode 100644
index 0000000..e90a817
--- /dev/null
+++ b/conf/log4j.properties
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the console
+log4j.rootCategory=ERROR, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell log level to WARN. When running the spark-shell, the
+# log level for this class is used to overwrite the root logger's log level, so that
+# the user can have different defaults for the shell and regular Spark apps.
+log4j.logger.org.apache.spark.repl.Main=ERROR
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.spark-project.jetty=ERROR
+log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
+log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
+log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
+log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
diff --git a/conf/sbtconfig.txt b/conf/sbtconfig.txt
new file mode 100644
index 0000000..9f17943
--- /dev/null
+++ b/conf/sbtconfig.txt
@@ -0,0 +1,18 @@
+
+# Set the java args to high
+
+-Xmx2048M
+
+-XX:MaxPermSize=2048m
+
+-XX:ReservedCodeCacheSize=128m
+
+-XX:+CMSClassUnloadingEnabled
+
+# Set the extra SBT options
+
+-Dsbt.log.format=true
+
+# JNA
+
+-Djna.nosys=true
diff --git a/high_performance_pyspark/SQLLineage.py b/high_performance_pyspark/SQLLineage.py
new file mode 100644
index 0000000..121f0b4
--- /dev/null
+++ b/high_performance_pyspark/SQLLineage.py
@@ -0,0 +1,74 @@
+"""
+>>> df = rdd.toDF()
+>>> df2 = cutLineage(df)
+>>> df.head() == df2.head()
+True
+>>> df.schema == df2.schema
+True
+"""
+
+global df
+global sc
+global rdd
+global spark
+
+from pyspark.context import SparkContext
+from pyspark.sql import DataFrame, Row
+from pyspark.sql.session import SparkSession
+
+# tag::cutLineage[]
+def cutLineage(df):
+ """
+ Cut the lineage of a DataFrame - used for iterative algorithms
+
+ .. Note: This uses internal members and may break between versions
+ >>> df = rdd.toDF()
+ >>> cutDf = cutLineage(df)
+ >>> cutDf.count()
+ 3
+ """
+ jRDD = df._jdf.toJavaRDD()
+ jSchema = df._jdf.schema()
+ jRDD.cache()
+ sqlCtx = df.sql_ctx
+ try:
+ javaSqlCtx = sqlCtx._jsqlContext
+ except:
+ javaSqlCtx = sqlCtx._ssql_ctx
+ newJavaDF = javaSqlCtx.createDataFrame(jRDD, jSchema)
+ newDF = DataFrame(newJavaDF, sqlCtx)
+ return newDF
+# end::cutLineage[]
+
+def _setupTest():
+ globs = globals()
+ spark = SparkSession.builder \
+ .master("local[4]") \
+ .getOrCreate()
+ sc = spark._sc
+ sc.setLogLevel("ERROR")
+ globs['sc'] = sc
+ globs['spark'] = spark
+ globs['rdd'] = rdd = sc.parallelize(
+ [Row(field1=1, field2="row1"),
+ Row(field1=2, field2="row2"),
+ Row(field1=3, field2="row3")])
+ return globs
+
+def _test():
+ """
+ Run the tests.
+ """
+ import doctest
+ globs = _setupTest()
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+import sys
+if __name__ == "__main__":
+ _test()
+# Hack to support running in nose
+elif sys.stdout != sys.__stdout__:
+ _setupTest()
diff --git a/high_performance_pyspark/__init__.py b/high_performance_pyspark/__init__.py
new file mode 100644
index 0000000..7741593
--- /dev/null
+++ b/high_performance_pyspark/__init__.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+"""
+Python version of selected examples from High Performance Spark
+"""
+
+import os
+import sys
+
diff --git a/high_performance_pyspark/bad_pyspark.py b/high_performance_pyspark/bad_pyspark.py
new file mode 100644
index 0000000..46741dc
--- /dev/null
+++ b/high_performance_pyspark/bad_pyspark.py
@@ -0,0 +1,164 @@
+# This script triggers a number of different PySpark errors
+
+from pyspark import *
+from pyspark.sql.session import SparkSession
+
+global sc
+
+def nonExistentInput(sc):
+ """
+ Attempt to load non existent input
+ >>> nonExistentInput(sc)
+ Traceback (most recent call last):
+ ...
+ Py4JJavaError:...
+ """
+ # tag::nonExistent[]
+ failedRdd = sc.textFile("file:///doesnotexist")
+ failedRdd.count()
+ # end::nonExistent[]
+
+def throwOuter(sc):
+ """
+ Attempt to load non existant input
+ >>> throwOuter(sc)
+ Traceback (most recent call last):
+ ...
+ Py4JJavaError:...
+ """
+ # tag::throwOuter[]
+ data = sc.parallelize(range(10))
+ transform1 = data.map(lambda x: x + 1)
+ transform2 = transform1.map(lambda x: x / 0)
+ transform2.count()
+ # end::throwOuter[]
+
+def throwInner(sc):
+ """
+ Attempt to load non existant input
+ >>> throwInner(sc)
+ Traceback (most recent call last):
+ ...
+ Py4JJavaError:...
+ """
+ # tag::throwInner[]
+ data = sc.parallelize(range(10))
+ transform1 = data.map(lambda x: x / 0)
+ transform2 = transform1.map(lambda x: x + 1)
+ transform2.count()
+ # end::throwInner[]
+
+# tag::rewrite[]
+def add1(x):
+ """
+ Add 1
+ >>> add1(2)
+ 3
+ """
+ return x + 1
+
+def divZero(x):
+ """
+ Divide by zero (cause an error)
+ >>> divZero(2)
+ Traceback (most recent call last):
+ ...
+ ZeroDivisionError: integer division or modulo by zero
+ """
+ return x / 0
+
+def throwOuter2(sc):
+ """
+ Attempt to load non existant input
+ >>> throwOuter2(sc)
+ Traceback (most recent call last):
+ ...
+ Py4JJavaError:...
+ """
+ data = sc.parallelize(range(10))
+ transform1 = data.map(add1)
+ transform2 = transform1.map(divZero)
+ transform2.count()
+
+def throwInner2(sc):
+ """
+ Attempt to load non existant input
+ >>> throwInner2(sc)
+ Traceback (most recent call last):
+ ...
+ Py4JJavaError:...
+ """
+ data = sc.parallelize(range(10))
+ transform1 = data.map(divZero)
+ transform2 = transform1.map(add1)
+ transform2.count()
+# end::rewrite[]
+
+def throwInner3(sc):
+ """
+ Attempt to load non existant input
+ >>> throwInner3(sc)
+ Reject 10
+ """
+ data = sc.parallelize(range(10))
+ rejectedCount = sc.accumulator(0)
+ def loggedDivZero(x):
+ import logging
+ try:
+ return [x / 0]
+ except Exception as e:
+ rejectedCount.add(1)
+ logging.warning("Error found " + repr(e))
+ return []
+ transform1 = data.flatMap(loggedDivZero)
+ transform2 = transform1.map(add1)
+ transform2.count()
+ print("Reject " + str(rejectedCount.value))
+
+
+def runOutOfMemory(sc):
+ """
+ Run out of memory on the workers.
+ In standalone modes results in a memory error, but in YARN may trigger YARN container
+ overhead errors.
+ >>> runOutOfMemory(sc)
+ Traceback (most recent call last):
+ ...
+ Py4JJavaError:...
+ """
+ # tag::worker_oom[]
+ data = sc.parallelize(range(10))
+ def generate_too_much(itr):
+ return range(10000000000000)
+ itr = data.flatMap(generate_too_much)
+ itr.count()
+ # end::worker_oom[]
+
+def _setupTest():
+ globs = globals()
+ spark = SparkSession.builder \
+ .master("local[4]") \
+ .getOrCreate()
+ sc = spark._sc
+ globs['sc'] = sc
+ return globs
+
+def _test():
+ """
+ Run the tests.
+ Note this will print a lot of error message to stderr since we don't capture the JVM sub process
+ stdout/stderr for doctests.
+ """
+ import doctest
+ globs = setupTest()
+ (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+import sys
+if __name__ == "__main__":
+ _test()
+# Hack to support running in nose
+elif sys.stdout != sys.__stdout__:
+ _setupTest()
diff --git a/high_performance_pyspark/simple_perf.py b/high_performance_pyspark/simple_perf.py
new file mode 100644
index 0000000..773ad3e
--- /dev/null
+++ b/high_performance_pyspark/simple_perf.py
@@ -0,0 +1,138 @@
+# When running this example make sure to include the built Scala jar :
+# $SPARK_HOME/bin/pyspark --jars ./target/examples-0.0.1.jar --driver-class-path ./target/examples-0.0.1.jar
+# This example illustrates how to interface Scala and Python code, but caution
+# should be taken as it depends on many private members that may change in
+# future releases of Spark.
+
+from pyspark.sql.types import *
+from pyspark.sql import *
+import timeit
+import time
+
+def generate_scale_data(sqlCtx, rows, numCols):
+ """
+ Generate scale data for the performance test.
+
+ This also illustrates calling custom Scala code from the driver.
+
+ .. Note: This depends on many internal methods and may break between versions.
+
+ # This assumes our jars have been added with export PYSPARK_SUBMIT_ARGS
+ >>> session = SparkSession.builder.getOrCreate()
+ >>> scaleData = generate_scale_data(session, 100L, 1)
+ >>> scaleData[0].count()
+ 100
+ >>> scaleData[1].count()
+ 100
+ >>> session.stop()
+ """
+ # tag::javaInterop[]
+ sc = sqlCtx._sc
+ # Get the SQL Context, 2.1, 2.0 and pre-2.0 syntax - yay internals :p
+ try:
+ try:
+ javaSqlCtx = sqlCtx._jsqlContext
+ except:
+ javaSqlCtx = sqlCtx._ssql_ctx
+ except:
+ javaSqlCtx = sqlCtx._jwrapped
+ jsc = sc._jsc
+ scalasc = jsc.sc()
+ gateway = sc._gateway
+ # Call a java method that gives us back an RDD of JVM Rows (Int, Double)
+ # While Python RDDs are wrapped Java RDDs (even of Rows) the contents are
+ # different, so we can't directly wrap this.
+ # This returns a Java RDD of Rows - normally it would better to
+ # return a DataFrame directly, but for illustration we will work
+ # with an RDD of Rows.
+ java_rdd = (gateway.jvm.com.highperformancespark.examples.
+ tools.GenerateScalingData.
+ generateMiniScaleRows(scalasc, rows, numCols))
+ # Schemas are serialized to JSON and sent back and forth
+ # Construct a Python Schema and turn it into a Java Schema
+ schema = StructType([
+ StructField("zip", IntegerType()),
+ StructField("fuzzyness", DoubleType())])
+ # 2.1 / pre-2.1
+ try:
+ jschema = javaSqlCtx.parseDataType(schema.json())
+ except:
+ jschema = sqlCtx._jsparkSession.parseDataType(schema.json())
+ # Convert the Java RDD to Java DataFrame
+ java_dataframe = javaSqlCtx.createDataFrame(java_rdd, jschema)
+ # Wrap the Java DataFrame into a Python DataFrame
+ python_dataframe = DataFrame(java_dataframe, sqlCtx)
+ # Convert the Python DataFrame into an RDD
+ pairRDD = python_dataframe.rdd.map(lambda row: (row[0], row[1]))
+ return (python_dataframe, pairRDD)
+ # end::javaInterop[]
+
+def runOnDF(df):
+ result = df.groupBy("zip").avg("fuzzyness").count()
+ return result
+
+def runOnRDD(rdd):
+ result = rdd.map(lambda (x, y): (x, (y, 1))). \
+ reduceByKey(lambda x, y: (x[0] + y [0], x[1] + y[1])). \
+ count()
+ return result
+
+def groupOnRDD(rdd):
+ return rdd.groupByKey().mapValues(lambda v: sum(v) / float(len(v))).count()
+
+def run(sc, sqlCtx, scalingFactor, size):
+ """
+ Run the simple perf test printing the results to stdout.
+
+ >>> session = SparkSession.builder.getOrCreate()
+ >>> sc = session._sc
+ >>> run(sc, session, 5L, 1)
+ RDD:
+ ...
+ group:
+ ...
+ df:
+ ...
+ yay
+ >>> session.stop()
+ """
+ (input_df, input_rdd) = generate_scale_data(sqlCtx, scalingFactor, size)
+ input_rdd.cache().count()
+ rddTimeings = timeit.repeat(stmt=lambda: runOnRDD(input_rdd), repeat=10, number=1, timer=time.time, setup='gc.enable()')
+ groupTimeings = timeit.repeat(stmt=lambda: groupOnRDD(input_rdd), repeat=10, number=1, timer=time.time, setup='gc.enable()')
+ input_df.cache().count()
+ dfTimeings = timeit.repeat(stmt=lambda: runOnDF(input_df), repeat=10, number=1, timer=time.time, setup='gc.enable()')
+ print "RDD:"
+ print rddTimeings
+ print "group:"
+ print groupTimeings
+ print "df:"
+ print dfTimeings
+ print "yay"
+
+def parseArgs(args):
+ """
+ Parse the args, no error checking.
+
+ >>> parseArgs(["foobaz", "1", "2"])
+ (1, 2)
+ """
+ scalingFactor = int(args[1])
+ size = int(args[2])
+ return (scalingFactor, size)
+
+
+if __name__ == "__main__":
+
+ """
+ Usage: simple_perf_test scalingFactor size
+ """
+ import sys
+ from pyspark import SparkContext
+ from pyspark.sql import SQLContext
+ (scalingFactor, size) = parseArgs(sys.argv)
+ session = SparkSession.appName("SimplePythonPerf").builder.getOrCreate()
+ sc = session._sc
+ run(sc, session, scalingFactor, size)
+
+ sc.stop()
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..26c430e
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1,22 @@
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
+
+resolvers += "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/"
+
+resolvers += "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
+
+
+//tag::addSparkPackagesPlugin[]
+resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"
+
+addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.5")
+//end::addSparkPackagesPlugin[]
+
+//addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
+
+addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
+
+//tag::sbtJNIPlugin[]
+addSbtPlugin("ch.jodersky" %% "sbt-jni" % "1.0.0-RC3")
+//end::sbtJNIPlugin[]
+
+addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0")
diff --git a/resources/mysql-connector-java-5.1.38.jar b/resources/mysql-connector-java-5.1.38.jar
new file mode 100644
index 0000000..be09493
Binary files /dev/null and b/resources/mysql-connector-java-5.1.38.jar differ
diff --git a/resources/rawpanda.json b/resources/rawpanda.json
new file mode 100644
index 0000000..1d9940d
--- /dev/null
+++ b/resources/rawpanda.json
@@ -0,0 +1,2 @@
+{"name":"mission","pandas":[{"id":1,"zip":"94110","pt":"giant", "happy":true,
+ "attributes":[0.4,0.5]}]}
diff --git a/sbt/sbt b/sbt/sbt
new file mode 100755
index 0000000..aac1085
--- /dev/null
+++ b/sbt/sbt
@@ -0,0 +1,52 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script launches sbt for this project. If present it uses the system
+# version of sbt. If there is no system version of sbt it attempts to download
+# sbt locally.
+SBT_VERSION=0.13.9
+URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+JAR=sbt/sbt-launch-${SBT_VERSION}.jar
+
+# Download sbt launch jar if it hasn't been downloaded yet
+if [ ! -f ${JAR} ]; then
+ # Download
+ printf "Attempting to fetch sbt\n"
+ set -x
+ JAR_DL=${JAR}.part
+ if hash wget 2>/dev/null; then
+ (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) && mv ${JAR_DL} ${JAR}
+ elif hash axel 2>/dev/null; then
+ (axel ${URL1} -o ${JAR_DL} || axel ${URL2} -o ${JAR_DL}) && mv ${JAR_DL} ${JAR}
+ else
+ printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
+ exit -1
+ fi
+fi
+if [ ! -f ${JAR} ]; then
+ # We failed to download
+ printf "Our attempt to download sbt locally to ${JAR} failed. Please install sbt manually from http://www.scala-sbt.org/\n"
+ exit -1
+fi
+printf "Launching sbt from ${JAR}\n"
+java \
+ -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=256m \
+ -jar ${JAR} \
+ "$@"
diff --git a/sbt/sbt.bat b/sbt/sbt.bat
new file mode 100644
index 0000000..0f7a3e9
--- /dev/null
+++ b/sbt/sbt.bat
@@ -0,0 +1,95 @@
+@REM SBT launcher script
+@REM
+@REM Environment:
+@REM JAVA_HOME - location of a JDK home dir (mandatory)
+@REM SBT_OPTS - JVM options (optional)
+@REM Configuration:
+@REM sbtconfig.txt found in the SBT_HOME.
+
+@REM ZOMG! We need delayed expansion to build up CFG_OPTS later
+@setlocal enabledelayedexpansion
+
+@echo off
+set SBT_HOME=%~dp0
+
+rem FIRST we load the config file of extra options.
+set FN=%SBT_HOME%\..\conf\sbtconfig.txt
+set CFG_OPTS=
+FOR /F "tokens=* eol=# usebackq delims=" %%i IN ("%FN%") DO (
+ set DO_NOT_REUSE_ME=%%i
+ rem ZOMG (Part #2) WE use !! here to delay the expansion of
+ rem CFG_OPTS, otherwise it remains "" for this loop.
+ set CFG_OPTS=!CFG_OPTS! !DO_NOT_REUSE_ME!
+)
+
+rem poor man's jenv (which is not available on Windows)
+IF DEFINED JAVA_HOMES (
+ IF EXIST .java-version FOR /F %%A IN (.java-version) DO (
+ SET JAVA_HOME=%JAVA_HOMES%\%%A
+ SET JDK_HOME=%JAVA_HOMES%\%%A
+ )
+)
+rem must set PATH or wrong javac is used for java projects
+IF DEFINED JAVA_HOME SET "PATH=%JAVA_HOME%\bin;%PATH%"
+
+rem users can set JAVA_OPTS via .jvmopts (sbt-extras style)
+IF EXIST .jvmopts FOR /F %%A IN (.jvmopts) DO (
+ SET JAVA_OPTS=%%A !JAVA_OPTS!
+)
+
+rem We use the value of the JAVACMD environment variable if defined
+set _JAVACMD=%JAVACMD%
+
+if "%_JAVACMD%"=="" (
+ if not "%JAVA_HOME%"=="" (
+ if exist "%JAVA_HOME%\bin\java.exe" set "_JAVACMD=%JAVA_HOME%\bin\java.exe"
+ )
+)
+
+if "%_JAVACMD%"=="" set _JAVACMD=java
+
+rem We use the value of the JAVA_OPTS environment variable if defined, rather than the config.
+set _JAVA_OPTS=%JAVA_OPTS%
+if "%_JAVA_OPTS%"=="" set _JAVA_OPTS=%CFG_OPTS%
+
+:args_loop
+if "%~1" == "" goto args_end
+
+if "%~1" == "-jvm-debug" (
+ set JVM_DEBUG=true
+ set /a JVM_DEBUG_PORT=5005 2>nul >nul
+) else if "!JVM_DEBUG!" == "true" (
+ set /a JVM_DEBUG_PORT=%1 2>nul >nul
+ if not "%~1" == "!JVM_DEBUG_PORT!" (
+ set SBT_ARGS=!SBT_ARGS! %1
+ )
+) else (
+ set SBT_ARGS=!SBT_ARGS! %1
+)
+
+shift
+goto args_loop
+:args_end
+
+if defined JVM_DEBUG_PORT (
+ set _JAVA_OPTS=!_JAVA_OPTS! -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=!JVM_DEBUG_PORT!
+)
+
+call :run %SBT_ARGS%
+
+if ERRORLEVEL 1 goto error
+goto end
+
+:run
+
+"%_JAVACMD%" %_JAVA_OPTS% %SBT_OPTS% -cp "%SBT_HOME%sbt-launch.jar" xsbt.boot.Boot %*
+goto :eof
+
+:error
+@endlocal
+exit /B 1
+
+
+:end
+@endlocal
+exit /B 0
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
new file mode 100644
index 0000000..17d780a
--- /dev/null
+++ b/scalastyle-config.xml
@@ -0,0 +1,117 @@
+
+ Scalastyle standard configuration
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/shell-scripts/launch-with-mysql-jdbc b/shell-scripts/launch-with-mysql-jdbc
new file mode 100644
index 0000000..90ac352
--- /dev/null
+++ b/shell-scripts/launch-with-mysql-jdbc
@@ -0,0 +1,5 @@
+ASSEMBLY_JAR=./target/scala-2.10/examples_2.10.jar
+CLASS="com.highperformancespark.dataframe.mysqlload"
+#tag:[submit]
+spark-submit --jars ./resources/mysql-connector-java-5.1.38.jar $ASSEMBLY_JAR $CLASS
+#end:[submit]
\ No newline at end of file
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..e88b326
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,74 @@
+################################################################
+# A minimal CMake file that is compatible with sbt-jni #
+# #
+# All settings required by sbt-jni have been marked so, please #
+# add/modify/remove settings to build your specific library. #
+################################################################
+
+cmake_minimum_required(VERSION 2.6)
+
+# Define project and related variables
+#
+project (high-performance-spark)
+
+# Enable fortan
+enable_language (Fortran)
+include(FortranCInterface)
+
+
+# FFLAGS depend on the compiler
+get_filename_component (Fortran_COMPILER_NAME ${CMAKE_Fortran_COMPILER} NAME)
+
+
+# Set versions and library name
+# (required by sbt-jni) please use semantic versioning
+#
+set (VERSION_MAJOR 0)
+set (VERSION_MINOR 0)
+set (VERSION_PATCH 0)
+# (required by sbt-jni) major version will always be appended to library name
+set (LIB_NAME ${CMAKE_PROJECT_NAME}${VERSION_MAJOR})
+
+# Command-line options
+#
+# (set by sbt-jni)
+set (LIB_INSTALL_DIR lib CACHE PATH "Path in which to install libraries (equivalent to Autoconf --libdir).")
+# (set by sbt-jni)
+set (LIB_ENABLE_MINOR_VERSIONS ON CACHE BOOLEAN "Build libraries with minor and patch versions appended.")
+
+# Setup JNI
+find_package(JNI REQUIRED)
+if (JNI_FOUND)
+ message (STATUS "JNI include directories: ${JNI_INCLUDE_DIRS}")
+endif()
+
+# Include directories
+include_directories(.)
+include_directories(./main/c)
+include_directories(./main/c/include)
+include_directories(${JNI_INCLUDE_DIRS})
+
+# Setup main shared library
+file(GLOB LIB_SRC
+ "*.c"
+ "*.cpp"
+ "./main/c/*.c"
+ "./main/c/*.cpp"
+ "./main/fortran/*.f*"
+)
+add_library(${LIB_NAME} SHARED ${LIB_SRC})
+
+# By default, in a regular build, minor and patch versions are added to the generated files.
+# When built through sbt-jni however, LIB_ENABLE_MINOR_VERSIONS is deactivated and only a
+# major-versioned library file is built.
+if (LIB_ENABLE_MINOR_VERSIONS)
+ set_target_properties(
+ ${LIB_NAME}
+ PROPERTIES
+ VERSION 0.${VERSION_MINOR}.${VERSION_PATCH} # major version always 0, it is included in library name
+ SOVERSION 0
+ )
+endif()
+
+# Installation targets
+install(TARGETS ${LIB_NAME} LIBRARY DESTINATION ${LIB_INSTALL_DIR})
diff --git a/src/main/c/include/com_highperformancespark_examples_ffi_SumJNI.h b/src/main/c/include/com_highperformancespark_examples_ffi_SumJNI.h
new file mode 100644
index 0000000..75be264
--- /dev/null
+++ b/src/main/c/include/com_highperformancespark_examples_ffi_SumJNI.h
@@ -0,0 +1,21 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include
+/* Header for class com_highperformancespark_examples_ffi_SumJNI */
+
+#ifndef _Included_com_highperformancespark_examples_ffi_SumJNI
+#define _Included_com_highperformancespark_examples_ffi_SumJNI
+#ifdef __cplusplus
+extern "C" {
+#endif
+/*
+ * Class: com_highperformancespark_examples_ffi_SumJNI
+ * Method: sum
+ * Signature: ([I)I
+ */
+JNIEXPORT jint JNICALL Java_com_highperformancespark_examples_ffi_SumJNI_sum
+ (JNIEnv *, jobject, jintArray);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
diff --git a/src/main/c/sum.c b/src/main/c/sum.c
new file mode 100644
index 0000000..f571aad
--- /dev/null
+++ b/src/main/c/sum.c
@@ -0,0 +1,9 @@
+#include "sum.h"
+
+int sum(int input[], int num_elem) {
+ int c, ret = 0;
+ for (c = 0; c < num_elem; c++) {
+ ret += input[c];
+ }
+ return ret;
+}
diff --git a/src/main/c/sum.h b/src/main/c/sum.h
new file mode 100644
index 0000000..d04be96
--- /dev/null
+++ b/src/main/c/sum.h
@@ -0,0 +1,6 @@
+#ifndef _SUM_H
+#define _SUM_H
+
+int sum(int input[], int num_elem);
+
+#endif /* _SUM_H */
diff --git a/src/main/c/sum_wrapper.c b/src/main/c/sum_wrapper.c
new file mode 100644
index 0000000..a499d3e
--- /dev/null
+++ b/src/main/c/sum_wrapper.c
@@ -0,0 +1,16 @@
+#include "sum.h"
+#include "include/com_highperformancespark_examples_ffi_SumJNI.h"
+#include
+#include
+
+/*
+ * Class: com_highperformancespark_examples_ffi_SumJNI
+ * Method: sum
+ * Signature: ([I)I
+ */
+JNIEXPORT jint JNICALL Java_com_highperformancespark_examples_ffi_SumJNI_sum
+(JNIEnv *env, jobject obj, jintArray ja) {
+ jsize size = (*env)->GetArrayLength(env, ja);
+ jint *a = (*env)->GetIntArrayElements(env, ja, 0);
+ return sum(a, size);
+}
diff --git a/src/main/c/sumf_wrapper.c b/src/main/c/sumf_wrapper.c
new file mode 100644
index 0000000..43c7da0
--- /dev/null
+++ b/src/main/c/sumf_wrapper.c
@@ -0,0 +1,7 @@
+// Fortran routine
+extern int sumf(int *, int[]);
+
+// Call the fortran code which expects by reference size
+int wrap_sum(int input[], int size) {
+ return sumf(&size, input);
+}
diff --git a/src/main/fortran/sumf.f95 b/src/main/fortran/sumf.f95
new file mode 100644
index 0000000..04680b7
--- /dev/null
+++ b/src/main/fortran/sumf.f95
@@ -0,0 +1,4 @@
+ INTEGER FUNCTION SUMF(N,A) BIND(C, NAME='sumf')
+ INTEGER A(N)
+ SUMF=SUM(A)
+ END
diff --git a/src/main/java/com/highperformancespark/examples/JavaInterop.java b/src/main/java/com/highperformancespark/examples/JavaInterop.java
new file mode 100644
index 0000000..3e3ed6b
--- /dev/null
+++ b/src/main/java/com/highperformancespark/examples/JavaInterop.java
@@ -0,0 +1,38 @@
+package com.highperformancespark.examples;
+
+import scala.reflect.*;
+import scala.Tuple2;
+
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.spark.sql.functions.*;
+
+public class JavaInterop {
+
+ //tag::realClassTag[]
+ public static JavaPairRDD wrapPairRDD(
+ RDD> rdd) {
+ // Construct the class tags
+ ClassTag strCt = ClassTag$.MODULE$.apply(String.class);
+ ClassTag longCt = ClassTag$.MODULE$.apply(scala.Long.class);
+ return new JavaPairRDD(rdd, strCt, longCt);
+ }
+ //end::realClassTag[]
+
+ //tag::fakeClassTag[]
+ public static JavaPairRDD wrapPairRDDFakeCt(
+ RDD> rdd) {
+ // Construct the class tags by casting AnyRef - this would be more commonly done
+ // with generic or templated code where we can't explicitly construct the correct
+ // class tag as using fake class tags may result in degraded performance.
+ ClassTag