Skip to content

Commit 4d6f654

Browse files
committed
[SPARK-16318][SQL] Implement all remaining xpath functions
This patch implements all remaining xpath functions that Hive supports and not natively supported in Spark: xpath_int, xpath_short, xpath_long, xpath_float, xpath_double, xpath_string, and xpath. Added unit tests and end-to-end tests. Author: petermaxlee <[email protected]> Closes apache#13991 from petermaxlee/SPARK-16318.
1 parent 139d5ea commit 4d6f654

File tree

9 files changed

+437
-135
lines changed

9 files changed

+437
-135
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtil.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.IOException;
2121
import java.io.Reader;
22-
import java.io.StringReader;
2322

2423
import javax.xml.namespace.QName;
2524
import javax.xml.xpath.XPath;
@@ -44,7 +43,7 @@ public class UDFXPathUtil {
4443
private XPathExpression expression = null;
4544
private String oldPath = null;
4645

47-
public Object eval(String xml, String path, QName qname) {
46+
public Object eval(String xml, String path, QName qname) throws XPathExpressionException {
4847
if (xml == null || path == null || qname == null) {
4948
return null;
5049
}
@@ -57,7 +56,7 @@ public Object eval(String xml, String path, QName qname) {
5756
try {
5857
expression = xpath.compile(path);
5958
} catch (XPathExpressionException e) {
60-
expression = null;
59+
throw new RuntimeException("Invalid XPath '" + path + "'" + e.getMessage(), e);
6160
}
6261
oldPath = path;
6362
}
@@ -67,36 +66,35 @@ public Object eval(String xml, String path, QName qname) {
6766
}
6867

6968
reader.set(xml);
70-
7169
try {
7270
return expression.evaluate(inputSource, qname);
7371
} catch (XPathExpressionException e) {
74-
throw new RuntimeException ("Invalid expression '" + oldPath + "'", e);
72+
throw new RuntimeException("Invalid XML document: " + e.getMessage() + "\n" + xml, e);
7573
}
7674
}
7775

78-
public Boolean evalBoolean(String xml, String path) {
76+
public Boolean evalBoolean(String xml, String path) throws XPathExpressionException {
7977
return (Boolean) eval(xml, path, XPathConstants.BOOLEAN);
8078
}
8179

82-
public String evalString(String xml, String path) {
80+
public String evalString(String xml, String path) throws XPathExpressionException {
8381
return (String) eval(xml, path, XPathConstants.STRING);
8482
}
8583

86-
public Double evalNumber(String xml, String path) {
84+
public Double evalNumber(String xml, String path) throws XPathExpressionException {
8785
return (Double) eval(xml, path, XPathConstants.NUMBER);
8886
}
8987

90-
public Node evalNode(String xml, String path) {
88+
public Node evalNode(String xml, String path) throws XPathExpressionException {
9189
return (Node) eval(xml, path, XPathConstants.NODE);
9290
}
9391

94-
public NodeList evalNodeList(String xml, String path) {
92+
public NodeList evalNodeList(String xml, String path) throws XPathExpressionException {
9593
return (NodeList) eval(xml, path, XPathConstants.NODESET);
9694
}
9795

9896
/**
99-
* Reusable, non-threadsafe version of {@link StringReader}.
97+
* Reusable, non-threadsafe version of {@link java.io.StringReader}.
10098
*/
10199
public static class ReusableStringReader extends Reader {
102100

@@ -117,29 +115,32 @@ public void set(String s) {
117115

118116
/** Check to make sure that the stream has not been closed */
119117
private void ensureOpen() throws IOException {
120-
if (str == null)
118+
if (str == null) {
121119
throw new IOException("Stream closed");
120+
}
122121
}
123122

124123
@Override
125124
public int read() throws IOException {
126125
ensureOpen();
127-
if (next >= length)
126+
if (next >= length) {
128127
return -1;
128+
}
129129
return str.charAt(next++);
130130
}
131131

132132
@Override
133-
public int read(char cbuf[], int off, int len) throws IOException {
133+
public int read(char[] cbuf, int off, int len) throws IOException {
134134
ensureOpen();
135135
if ((off < 0) || (off > cbuf.length) || (len < 0)
136136
|| ((off + len) > cbuf.length) || ((off + len) < 0)) {
137137
throw new IndexOutOfBoundsException();
138138
} else if (len == 0) {
139139
return 0;
140140
}
141-
if (next >= length)
141+
if (next >= length) {
142142
return -1;
143+
}
143144
int n = Math.min(length - next, len);
144145
str.getChars(next, next + n, cbuf, off);
145146
next += n;
@@ -149,8 +150,9 @@ public int read(char cbuf[], int off, int len) throws IOException {
149150
@Override
150151
public long skip(long ns) throws IOException {
151152
ensureOpen();
152-
if (next >= length)
153+
if (next >= length) {
153154
return 0;
155+
}
154156
// Bound skip by beginning and end of the source
155157
long n = Math.min(length - next, ns);
156158
n = Math.max(-next, n);

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,15 @@ object FunctionRegistry {
310310
expression[UnBase64]("unbase64"),
311311
expression[Unhex]("unhex"),
312312
expression[Upper]("upper"),
313+
expression[XPathList]("xpath"),
313314
expression[XPathBoolean]("xpath_boolean"),
315+
expression[XPathDouble]("xpath_double"),
316+
expression[XPathDouble]("xpath_number"),
317+
expression[XPathFloat]("xpath_float"),
318+
expression[XPathInt]("xpath_int"),
319+
expression[XPathLong]("xpath_long"),
320+
expression[XPathShort]("xpath_short"),
321+
expression[XPathString]("xpath_string"),
314322

315323
// datetime functions
316324
expression[AddMonths]("add_months"),

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/XPathBoolean.scala

Lines changed: 0 additions & 58 deletions
This file was deleted.
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions.xml
19+
20+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
21+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure
22+
import org.apache.spark.sql.catalyst.expressions._
23+
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
24+
import org.apache.spark.sql.catalyst.util.GenericArrayData
25+
import org.apache.spark.sql.types._
26+
import org.apache.spark.unsafe.types.UTF8String
27+
28+
/**
29+
* Base class for xpath_boolean, xpath_double, xpath_int, etc.
30+
*
31+
* This is not the world's most efficient implementation due to type conversion, but works.
32+
*/
33+
abstract class XPathExtract extends BinaryExpression with ExpectsInputTypes with CodegenFallback {
34+
override def left: Expression = xml
35+
override def right: Expression = path
36+
37+
/** XPath expressions are always nullable, e.g. if the xml string is empty. */
38+
override def nullable: Boolean = true
39+
40+
override def inputTypes: Seq[AbstractDataType] = Seq(StringType, StringType)
41+
42+
override def checkInputDataTypes(): TypeCheckResult = {
43+
if (!path.foldable) {
44+
TypeCheckFailure("path should be a string literal")
45+
} else {
46+
super.checkInputDataTypes()
47+
}
48+
}
49+
50+
@transient protected lazy val xpathUtil = new UDFXPathUtil
51+
@transient protected lazy val pathString: String = path.eval().asInstanceOf[UTF8String].toString
52+
53+
/** Concrete implementations need to override the following three methods. */
54+
def xml: Expression
55+
def path: Expression
56+
}
57+
58+
@ExpressionDescription(
59+
usage = "_FUNC_(xml, xpath) - Evaluates a boolean xpath expression.",
60+
extended = "> SELECT _FUNC_('<a><b>1</b></a>','a/b');\ntrue")
61+
case class XPathBoolean(xml: Expression, path: Expression) extends XPathExtract {
62+
63+
override def prettyName: String = "xpath_boolean"
64+
override def dataType: DataType = BooleanType
65+
66+
override def nullSafeEval(xml: Any, path: Any): Any = {
67+
xpathUtil.evalBoolean(xml.asInstanceOf[UTF8String].toString, pathString)
68+
}
69+
}
70+
71+
@ExpressionDescription(
72+
usage = "_FUNC_(xml, xpath) - Returns a short value that matches the xpath expression",
73+
extended = "> SELECT _FUNC_('<a><b>1</b><b>2</b></a>','sum(a/b)');\n3")
74+
case class XPathShort(xml: Expression, path: Expression) extends XPathExtract {
75+
override def prettyName: String = "xpath_int"
76+
override def dataType: DataType = ShortType
77+
78+
override def nullSafeEval(xml: Any, path: Any): Any = {
79+
val ret = xpathUtil.evalNumber(xml.asInstanceOf[UTF8String].toString, pathString)
80+
if (ret eq null) null else ret.shortValue()
81+
}
82+
}
83+
84+
@ExpressionDescription(
85+
usage = "_FUNC_(xml, xpath) - Returns an integer value that matches the xpath expression",
86+
extended = "> SELECT _FUNC_('<a><b>1</b><b>2</b></a>','sum(a/b)');\n3")
87+
case class XPathInt(xml: Expression, path: Expression) extends XPathExtract {
88+
override def prettyName: String = "xpath_int"
89+
override def dataType: DataType = IntegerType
90+
91+
override def nullSafeEval(xml: Any, path: Any): Any = {
92+
val ret = xpathUtil.evalNumber(xml.asInstanceOf[UTF8String].toString, pathString)
93+
if (ret eq null) null else ret.intValue()
94+
}
95+
}
96+
97+
@ExpressionDescription(
98+
usage = "_FUNC_(xml, xpath) - Returns a long value that matches the xpath expression",
99+
extended = "> SELECT _FUNC_('<a><b>1</b><b>2</b></a>','sum(a/b)');\n3")
100+
case class XPathLong(xml: Expression, path: Expression) extends XPathExtract {
101+
override def prettyName: String = "xpath_long"
102+
override def dataType: DataType = LongType
103+
104+
override def nullSafeEval(xml: Any, path: Any): Any = {
105+
val ret = xpathUtil.evalNumber(xml.asInstanceOf[UTF8String].toString, pathString)
106+
if (ret eq null) null else ret.longValue()
107+
}
108+
}
109+
110+
@ExpressionDescription(
111+
usage = "_FUNC_(xml, xpath) - Returns a float value that matches the xpath expression",
112+
extended = "> SELECT _FUNC_('<a><b>1</b><b>2</b></a>','sum(a/b)');\n3.0")
113+
case class XPathFloat(xml: Expression, path: Expression) extends XPathExtract {
114+
override def prettyName: String = "xpath_float"
115+
override def dataType: DataType = FloatType
116+
117+
override def nullSafeEval(xml: Any, path: Any): Any = {
118+
val ret = xpathUtil.evalNumber(xml.asInstanceOf[UTF8String].toString, pathString)
119+
if (ret eq null) null else ret.floatValue()
120+
}
121+
}
122+
123+
@ExpressionDescription(
124+
usage = "_FUNC_(xml, xpath) - Returns a double value that matches the xpath expression",
125+
extended = "> SELECT _FUNC_('<a><b>1</b><b>2</b></a>','sum(a/b)');\n3.0")
126+
case class XPathDouble(xml: Expression, path: Expression) extends XPathExtract {
127+
override def prettyName: String = "xpath_float"
128+
override def dataType: DataType = DoubleType
129+
130+
override def nullSafeEval(xml: Any, path: Any): Any = {
131+
val ret = xpathUtil.evalNumber(xml.asInstanceOf[UTF8String].toString, pathString)
132+
if (ret eq null) null else ret.doubleValue()
133+
}
134+
}
135+
136+
// scalastyle:off line.size.limit
137+
@ExpressionDescription(
138+
usage = "_FUNC_(xml, xpath) - Returns the text contents of the first xml node that matches the xpath expression",
139+
extended = "> SELECT _FUNC_('<a><b>b</b><c>cc</c></a>','a/c');\ncc")
140+
// scalastyle:on line.size.limit
141+
case class XPathString(xml: Expression, path: Expression) extends XPathExtract {
142+
override def prettyName: String = "xpath_string"
143+
override def dataType: DataType = StringType
144+
145+
override def nullSafeEval(xml: Any, path: Any): Any = {
146+
val ret = xpathUtil.evalString(xml.asInstanceOf[UTF8String].toString, pathString)
147+
UTF8String.fromString(ret)
148+
}
149+
}
150+
151+
// scalastyle:off line.size.limit
152+
@ExpressionDescription(
153+
usage = "_FUNC_(xml, xpath) - Returns a string array of values within xml nodes that match the xpath expression",
154+
extended = "> SELECT _FUNC_('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()');\n['b1','b2','b3']")
155+
// scalastyle:on line.size.limit
156+
case class XPathList(xml: Expression, path: Expression) extends XPathExtract {
157+
override def prettyName: String = "xpath"
158+
override def dataType: DataType = ArrayType(StringType, containsNull = false)
159+
160+
override def nullSafeEval(xml: Any, path: Any): Any = {
161+
val nodeList = xpathUtil.evalNodeList(xml.asInstanceOf[UTF8String].toString, pathString)
162+
if (nodeList ne null) {
163+
val ret = new Array[UTF8String](nodeList.getLength)
164+
var i = 0
165+
while (i < nodeList.getLength) {
166+
ret(i) = UTF8String.fromString(nodeList.item(i).getNodeValue)
167+
i += 1
168+
}
169+
new GenericArrayData(ret)
170+
} else {
171+
null
172+
}
173+
}
174+
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ class UDFXPathUtilSuite extends SparkFunSuite {
4343
assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "", STRING) == null)
4444

4545
// wrong expression:
46-
assert(
47-
util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/text(", STRING) == null)
46+
intercept[RuntimeException] {
47+
util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/text(", STRING)
48+
}
4849
}
4950

5051
test("generic eval") {

0 commit comments

Comments
 (0)