diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala index a8dc2b20f56d..8351e94c0c36 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.xml +import java.nio.charset.StandardCharsets import javax.xml.xpath.XPathConstants.STRING import org.w3c.dom.Node @@ -85,7 +86,7 @@ class UDFXPathUtilSuite extends SparkFunSuite { tempFile.deleteOnExit() val fname = tempFile.getAbsolutePath - FileUtils.writeStringToFile(tempFile, secretValue) + FileUtils.writeStringToFile(tempFile, secretValue, StandardCharsets.UTF_8) val xml = s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlInputFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlInputFormat.scala index 4359ac02f5f5..6169cec6f821 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlInputFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlInputFormat.scala @@ -20,7 +20,7 @@ import java.io.{InputStream, InputStreamReader, IOException, Reader} import java.nio.ByteBuffer import java.nio.charset.Charset -import org.apache.commons.io.input.CountingInputStream +import org.apache.commons.io.input.BoundedInputStream import org.apache.hadoop.fs.Seekable import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.io.compress._ @@ -67,7 +67,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] { private var end: Long = _ private var reader: Reader = _ private var filePosition: Seekable = _ - private var countingIn: CountingInputStream = _ + private var countingIn: BoundedInputStream = _ private var readerLeftoverCharFn: () => Boolean = _ private var readerByteBuffer: ByteBuffer = _ private var decompressor: Decompressor = _ @@ -117,7 +117,9 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] { } } else { fsin.seek(start) - countingIn = new CountingInputStream(fsin) + countingIn = BoundedInputStream.builder() + .setInputStream(fsin) + .get() in = countingIn // don't use filePosition in this case. We have to count bytes read manually } @@ -156,7 +158,7 @@ private[xml] class XmlRecordReader extends RecordReader[LongWritable, Text] { if (filePosition != null) { filePosition.getPos } else { - start + countingIn.getByteCount - + start + countingIn.getCount - readerByteBuffer.remaining() - (if (readerLeftoverCharFn()) 1 else 0) }