Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -226,6 +227,8 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
public static final String PG_BIGINT_ARRAY = "_int8";
public static final String PG_REAL = "float4";
public static final String PG_REAL_ARRAY = "_float4";
public static final String PG_DECIMAL = "decimal";
public static final String PG_DECIMAL_ARRAY = "_decimal";
public static final String PG_DOUBLE_PRECISION = "float8";
public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
public static final String PG_NUMERIC = "numeric";
Expand Down Expand Up @@ -284,11 +287,20 @@ private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws S
return DataTypes.DOUBLE();
case PG_DOUBLE_PRECISION_ARRAY:
return DataTypes.ARRAY(DataTypes.DOUBLE());
case PG_DECIMAL:
case PG_NUMERIC:
return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
}
return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
case PG_DECIMAL_ARRAY:
case PG_NUMERIC_ARRAY:
return DataTypes.ARRAY(
DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
// see SPARK-26538: handle numeric without explicit precision and scale.
if (precision > 0) {
return DataTypes.ARRAY(DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
}
return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
case PG_CHAR:
case PG_CHARACTER:
return DataTypes.CHAR(precision);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testPrimitiveTypes() throws Exception {
List<Row> results = TableUtils.collectToList(
tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE)));

assertEquals("[1,[50],3,4,5.5,6.6,7.70000,true,a,b,c ,d,2016-06-22T19:10:25,2015-01-01,00:51:03]", results.toString());
assertEquals("[1,[50],3,4,5.5,6.6,7.70000,true,a,b,c ,d,2016-06-22T19:10:25,2015-01-01,00:51:03,500.000000000000000000]", results.toString());
}

@Test
Expand All @@ -103,6 +103,7 @@ public void testArrayTypes() throws Exception {
"[5.5, 6.6, 7.7]," +
"[6.6, 7.7, 8.8]," +
"[7.70000, 8.80000, 9.90000]," +
"[8.800000000000000000, 9.900000000000000000, 10.100000000000000000]," +
"[true, false, true]," +
"[a, b, c]," +
"[b, c, d]," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.DecimalType;

import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
Expand Down Expand Up @@ -174,6 +175,7 @@ public static TestTable getPrimitiveTable() {
// .field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4))
.field("date", DataTypes.DATE())
.field("time", DataTypes.TIME(0))
.field("default_numeric", DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18))
.build(),
"int integer, " +
"bytea bytea, " +
Expand All @@ -190,7 +192,8 @@ public static TestTable getPrimitiveTable() {
"timestamp timestamp(5), " +
// "timestamptz timestamptz(4), " +
"date date," +
"time time(0)",
"time time(0), " +
"default_numeric numeric ",
"1," +
"'2'," +
"3," +
Expand All @@ -206,7 +209,8 @@ public static TestTable getPrimitiveTable() {
"'2016-06-22 19:10:25'," +
// "'2006-06-22 19:10:25'," +
"'2015-01-01'," +
"'00:51:02.746572'"
"'00:51:02.746572', " +
"500"
);
}

Expand All @@ -221,6 +225,7 @@ public static TestTable getArrayTable() {
.field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
.field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
.field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
.field("numeric_arr_default", DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18)))
.field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
.field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
.field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
Expand All @@ -239,6 +244,7 @@ public static TestTable getArrayTable() {
"real_arr real[], " +
"double_precision_arr double precision[], " +
"numeric_arr numeric(10, 5)[], " +
"numeric_arr_default numeric[], " +
"boolean_arr boolean[], " +
"text_arr text[], " +
"char_arr char[], " +
Expand All @@ -256,6 +262,7 @@ public static TestTable getArrayTable() {
"'{5.5,6.6,7.7}'," +
"'{6.6,7.7,8.8}'," +
"'{7.7,8.8,9.9}'," +
"'{8.8,9.9,10.10}'," +
"'{true,false,true}'," +
"'{a,b,c}'," +
"'{b,c,d}'," +
Expand Down