diff --git a/presto-parquet/src/main/java/io/prestosql/parquet/predicate/TupleDomainParquetPredicate.java b/presto-parquet/src/main/java/io/prestosql/parquet/predicate/TupleDomainParquetPredicate.java index 11769215190a..b0cf50edc0db 100644 --- a/presto-parquet/src/main/java/io/prestosql/parquet/predicate/TupleDomainParquetPredicate.java +++ b/presto-parquet/src/main/java/io/prestosql/parquet/predicate/TupleDomainParquetPredicate.java @@ -52,11 +52,14 @@ import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.RealType.REAL; import static io.prestosql.spi.type.SmallintType.SMALLINT; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP; import static io.prestosql.spi.type.TinyintType.TINYINT; import static io.prestosql.spi.type.Varchars.isVarcharType; import static java.lang.Float.floatToRawIntBits; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; public class TupleDomainParquetPredicate implements Predicate @@ -229,6 +232,23 @@ public static Domain getDomain(Type type, long rowCount, Statistics statistic return createDomain(type, hasNullValue, parquetIntegerStatistics); } + if (type.equals(TIMESTAMP) && statistics instanceof LongStatistics) { + LongStatistics longStatistics = (LongStatistics) statistics; + if (longStatistics.genericGetMin() > longStatistics.genericGetMax()) { + failWithCorruptionException(failOnCorruptedParquetStatistics, column, id, longStatistics); + return Domain.create(ValueSet.all(type), hasNullValue); + } + + long min = longStatistics.getMin(); + long max = longStatistics.getMax(); + + ParquetIntegerStatistics parquetIntegerStatistics = + statistics.type().getOriginalType() == TIMESTAMP_MICROS + ? new ParquetIntegerStatistics(MICROSECONDS.toMillis(min), MICROSECONDS.toMillis(max)) + : new ParquetIntegerStatistics(min, max); + return createDomain(type, hasNullValue, parquetIntegerStatistics); + } + return Domain.create(ValueSet.all(type), hasNullValue); } diff --git a/presto-parquet/src/test/java/io/prestosql/parquet/TestTupleDomainParquetPredicate.java b/presto-parquet/src/test/java/io/prestosql/parquet/TestTupleDomainParquetPredicate.java index 36cfc15de3fe..a04ffeb653b8 100644 --- a/presto-parquet/src/test/java/io/prestosql/parquet/TestTupleDomainParquetPredicate.java +++ b/presto-parquet/src/test/java/io/prestosql/parquet/TestTupleDomainParquetPredicate.java @@ -32,6 +32,7 @@ import org.apache.parquet.column.statistics.LongStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -56,6 +57,7 @@ import static io.prestosql.spi.type.IntegerType.INTEGER; import static io.prestosql.spi.type.RealType.REAL; import static io.prestosql.spi.type.SmallintType.SMALLINT; +import static io.prestosql.spi.type.TimestampType.TIMESTAMP; import static io.prestosql.spi.type.TinyintType.TINYINT; import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType; import static io.prestosql.spi.type.VarcharType.createVarcharType; @@ -68,6 +70,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -271,6 +274,34 @@ public void testDate() .withMessage("Corrupted statistics for column \"DateColumn\" in Parquet file \"testFile\": [min: 200, max: 100, num_nulls: 0]"); } + @Test + public void testTimestamp() + throws ParquetCorruptionException + { + String column = "TimestampColumn"; + assertEquals(getDomain(TIMESTAMP, 0, null, ID, column, true), all(TIMESTAMP)); + assertEquals(getDomain(TIMESTAMP, 10, timestampColumnStats(1574026172386L, 1574026172386L, OriginalType.TIMESTAMP_MILLIS), ID, column, true), singleValue(TIMESTAMP, 1574026172386L)); + assertEquals(getDomain(TIMESTAMP, 10, timestampColumnStats(1546318800000L, 1574026172386L, OriginalType.TIMESTAMP_MILLIS), ID, column, true), create(ValueSet.ofRanges(range(TIMESTAMP, 1546318800000L, true, 1574026172386L, true)), false)); + + // ignore corrupted statistics + assertEquals(getDomain(TIMESTAMP, 10, timestampColumnStats(1574026172386L, 1546318800000L, OriginalType.TIMESTAMP_MILLIS), ID, column, false), create(ValueSet.all(TIMESTAMP), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(TIMESTAMP, 10, timestampColumnStats(1574026172386L, 1546318800000L, OriginalType.TIMESTAMP_MILLIS), ID, column, true)) + .withMessage("Corrupted statistics for column \"TimestampColumn\" in Parquet file \"testFile\": [min: 2019-11-17T21:29:32.386, max: 2019-01-01T05:00:00.000, num_nulls: 0]"); + + assertEquals(getDomain(TIMESTAMP, 0, null, ID, column, true), all(TIMESTAMP)); + assertEquals(getDomain(TIMESTAMP, 10, timestampColumnStats(1574026172386123L, 1574026172386123L, OriginalType.TIMESTAMP_MICROS), ID, column, true), singleValue(TIMESTAMP, 1574026172386L)); + assertEquals(getDomain(TIMESTAMP, 10, timestampColumnStats(1546318800000123L, 1574026172386123L, OriginalType.TIMESTAMP_MICROS), ID, column, true), create(ValueSet.ofRanges(range(TIMESTAMP, 1546318800000L, true, 1574026172386L, true)), false)); + + // ignore corrupted statistics + assertEquals(getDomain(TIMESTAMP, 10, timestampColumnStats(1574026172386000L, 1546318800000000L, OriginalType.TIMESTAMP_MICROS), ID, column, false), create(ValueSet.all(TIMESTAMP), false)); + // fail on corrupted statistics + assertThatExceptionOfType(ParquetCorruptionException.class) + .isThrownBy(() -> getDomain(TIMESTAMP, 10, timestampColumnStats(1574026172386123L, 1546318800000123L, OriginalType.TIMESTAMP_MICROS), ID, column, true)) + .withMessage("Corrupted statistics for column \"TimestampColumn\" in Parquet file \"testFile\": [min: 2019-11-17T21:29:32.386123, max: 2019-01-01T05:00:00.000123, num_nulls: 0]"); + } + @Test public void testVarcharMatchesWithStatistics() throws ParquetCorruptionException @@ -369,4 +400,11 @@ private static LongStatistics longColumnStats(long minimum, long maximum) statistics.setMinMax(minimum, maximum); return statistics; } + + private static LongStatistics timestampColumnStats(long minimum, long maximum, OriginalType type) + { + LongStatistics statistics = (LongStatistics) Statistics.createStats(new PrimitiveType(REQUIRED, INT64, "timestampColumn", type)); + statistics.setMinMax(minimum, maximum); + return statistics; + } }