Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Move Parquet log redirection code to a serializable Java class
ParquetLogRedirector which performs the redirection as part of its class
initialization
  • Loading branch information
Michael Allman committed Nov 9, 2016
commit 247ef91dc444bcba0751e9a7d76938c2bea07097
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.Serializable;
import java.util.logging.Handler;
import java.util.logging.Logger;

import org.apache.parquet.Log;
import org.slf4j.bridge.SLF4JBridgeHandler;

// Redirects the JUL logging for parquet-mr versions <= 1.8 to SLF4J logging using
// SLF4JBridgeHandler. Parquet-mr versions >= 1.9 use SLF4J directly
final class ParquetLogRedirector implements Serializable {
// Client classes should hold a reference to INSTANCE to ensure redirection occurs. This is
// especially important for Serializable classes where fields are set but constructors are
// ignored
static final ParquetLogRedirector INSTANCE = new ParquetLogRedirector();

// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
// references to loggers in both parquet-mr <= 1.6 and 1.7/1.8
private static final Logger apacheParquetLogger =
Logger.getLogger(Log.class.getPackage().getName());
private static final Logger parquetLogger = Logger.getLogger("parquet");

static {
// For parquet-mr 1.7 and 1.8, which are under `org.apache.parquet` namespace.
try {
Class.forName(Log.class.getName());
redirect(Logger.getLogger(Log.class.getPackage().getName()));
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}

// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
// namespace.
try {
Class.forName("parquet.Log");
redirect(Logger.getLogger("parquet"));
} catch (Throwable t) {
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
// should be removed after this issue is fixed.
}
}

private ParquetLogRedirector() {
}

private static void redirect(Logger logger) {
for (Handler handler : logger.getHandlers()) {
logger.removeHandler(handler);
}
logger.setUseParentHandlers(false);
logger.addHandler(new SLF4JBridgeHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import java.io.ObjectInputStream
import java.net.URI
import java.util.logging.{Logger => JLogger}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -30,14 +28,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.schema.MessageType
import org.slf4j.bridge.SLF4JBridgeHandler

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
Expand All @@ -57,21 +53,11 @@ class ParquetFileFormat
with DataSourceRegister
with Logging
with Serializable {
// Poor man's "static initializer". Scala doesn't have language support for static initializers,
// and it's important that we initialize `ParquetFileFormat.redirectParquetLogsViaSLF4J` before
// doing anything with the Parquet libraries. Rather than expect clients to initialize the
// `ParquetFileFormat` singleton object at the right time, we put that initialization in the
// constructor of this class. This method is idempotent, and essentially a no-op after its first
// call.
ParquetFileFormat.ensureParquetLogRedirection

// Java serialization will not call the default constructor. Make sure we call
// ParquetFileFormat.ensureParquetLogRedirection in deserialization by implementing this hook
// method.
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject
ParquetFileFormat.ensureParquetLogRedirection
}
// Hold a reference to the (serializable) singleton instance of ParquetLogRedirector. This
// ensures the ParquetLogRedirector class is initialized whether an instance of ParquetFileFormat
// is constructed or deserialized. Do not heed the Scala compiler's warning about an unused field
// here.
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE

override def shortName(): String = "parquet"

Expand Down Expand Up @@ -146,12 +132,11 @@ class ParquetFileFormat
}

new OutputWriterFactory {
// OutputWriterFactory is deserialized in the write path on the executor side before any
// output is actually written. Redirect Parquet logs at this time.
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject
ParquetFileFormat.ensureParquetLogRedirection
}
// This OutputWriterFactory instance is deserialized when writing Parquet files on the
// executor side without constructing or deserializing ParquetFileFormat. Therefore, we hold
// another reference to ParquetLogRedirector.INSTANCE here to ensure the latter class is
// initialized.
private val parquetLogRedirector = ParquetLogRedirector.INSTANCE

override def newInstance(
path: String,
Expand Down Expand Up @@ -694,45 +679,4 @@ object ParquetFileFormat extends Logging {
Failure(cause)
}.toOption
}

// JUL loggers must be held by a strong reference, otherwise they may get destroyed by GC.
// However, the root JUL logger used by Parquet isn't properly referenced. Here we keep
// references to loggers in both parquet-mr <= 1.6 and >= 1.7
val apacheParquetLogger: JLogger = JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
val parquetLogger: JLogger = JLogger.getLogger("parquet")

// Parquet initializes its own JUL logger in a static block which always prints to stdout. Here
// we redirect the JUL logger via SLF4J JUL bridge handler.
private val redirectParquetLogsViaSLF4J: Unit = {
def redirect(logger: JLogger): Unit = {
logger.getHandlers.foreach(logger.removeHandler)
logger.setUseParentHandlers(false)
logger.addHandler(new SLF4JBridgeHandler)
}

// For parquet-mr 1.7.0 and above versions, which are under `org.apache.parquet` namespace.
// scalastyle:off classforname
Class.forName(classOf[ApacheParquetLog].getName)
// scalastyle:on classforname
redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))

// For parquet-mr 1.6.0 and lower versions bundled with Hive, which are under `parquet`
// namespace.
try {
// scalastyle:off classforname
Class.forName("parquet.Log")
// scalastyle:on classforname
redirect(JLogger.getLogger("parquet"))
} catch { case _: Throwable =>
// SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged into the assembly
// when Spark is built with SBT. So `parquet.Log` may not be found. This try/catch block
// should be removed after this issue is fixed.
}
}

/**
* The `ParquetFileFormat` constructor calls this method to ensure that Parquet library log
* output is redirected through the SLF4J JUL bridge handler.
*/
private def ensureParquetLogRedirection(): Unit = redirectParquetLogsViaSLF4J
}