Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.util.Optional.of;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.NANOS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
Expand Down Expand Up @@ -246,6 +247,8 @@ public TypeMapping visit(Time type) {
return primitive(INT32, timeType(false, MILLIS));
} else if (bitWidth == 64 && timeUnit == TimeUnit.MICROSECOND) {
return primitive(INT64, timeType(false, MICROS));
} else if (bitWidth == 64 && timeUnit == TimeUnit.NANOSECOND) {
return primitive(INT64, timeType(false, NANOS));
}
throw new UnsupportedOperationException("Unsupported type " + type);
}
Expand All @@ -257,6 +260,8 @@ public TypeMapping visit(Timestamp type) {
return primitive(INT64, timestampType(isUtcNormalized(type), MILLIS));
} else if (timeUnit == TimeUnit.MICROSECOND) {
return primitive(INT64, timestampType(isUtcNormalized(type), MICROS));
} else if (timeUnit == TimeUnit.NANOSECOND) {
return primitive(INT64, timestampType(isUtcNormalized(type), NANOS));
}
throw new UnsupportedOperationException("Unsupported type " + type);
}
Expand Down Expand Up @@ -460,6 +465,8 @@ public Optional<TypeMapping> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotatio
public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == MICROS) {
return of(field(new ArrowType.Time(TimeUnit.MICROSECOND, 64)));
} else if (timeLogicalType.getUnit() == NANOS) {
return of(field(new ArrowType.Time(TimeUnit.NANOSECOND, 64)));
}
return empty();
}
Expand All @@ -471,6 +478,8 @@ public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnn
return of(field(new ArrowType.Timestamp(TimeUnit.MICROSECOND, getTimeZone(timestampLogicalType))));
case MILLIS:
return of(field(new ArrowType.Timestamp(TimeUnit.MILLISECOND, getTimeZone(timestampLogicalType))));
case NANOS:
return of(field(new ArrowType.Timestamp(TimeUnit.NANOSECOND, getTimeZone(timestampLogicalType))));
}
return empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.util.Arrays.asList;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.NANOS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
import static org.apache.parquet.schema.OriginalType.DATE;
Expand Down Expand Up @@ -66,6 +67,7 @@
import org.apache.parquet.arrow.schema.SchemaMapping.TypeMappingVisitor;
import org.apache.parquet.arrow.schema.SchemaMapping.UnionTypeMapping;
import org.apache.parquet.example.Paper;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
Expand Down Expand Up @@ -93,6 +95,7 @@ private static Field field(String name, ArrowType type, Field... children) {
field("f", new ArrowType.FixedSizeList(1), field(null, new ArrowType.Date(DateUnit.DAY))),
field("g", new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
field("h", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
field("i", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")),
field("j", new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
field("k", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")),
field("l", new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
Expand All @@ -112,6 +115,7 @@ private static Field field(String name, ArrowType type, Field... children) {
.named("f"))
.addField(Types.optional(FLOAT).named("g"))
.addField(Types.optional(INT64).as(timestampType(true, MILLIS)).named("h"))
.addField(Types.optional(INT64).as(timestampType(true, NANOS)).named("i"))
.addField(Types.optional(INT64).as(timestampType(false, MILLIS)).named("j"))
.addField(Types.optional(INT64).as(timestampType(true, MICROS)).named("k"))
.addField(Types.optional(INT64).as(timestampType(false, MICROS)).named("l"))
Expand Down Expand Up @@ -144,8 +148,10 @@ private static Field field(String name, ArrowType type, Field... children) {
field("m", new ArrowType.Time(TimeUnit.MILLISECOND, 32)),
field("n", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
field("o", new ArrowType.Interval(IntervalUnit.DAY_TIME)),
field("o1", new ArrowType.Interval(IntervalUnit.YEAR_MONTH))
));
field("o1", new ArrowType.Interval(IntervalUnit.YEAR_MONTH)),
field("p", new ArrowType.Time(TimeUnit.NANOSECOND, 64)),
field("q", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))
));

private final MessageType allTypesParquetSchema = Types.buildMessage()
.addField(Types.optional(BINARY).named("a"))
Expand Down Expand Up @@ -182,6 +188,8 @@ private static Field field(String name, ArrowType type, Field... children) {
.addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("n"))
.addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o"))
.addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o1"))
.addField(Types.optional(INT64).as(timeType(false, NANOS)).named("p"))
.addField(Types.optional(INT64).as(timestampType(true, NANOS)).named("q"))
.named("root");

private final Schema supportedTypesArrowSchema = new Schema(asList(
Expand All @@ -205,7 +213,9 @@ private static Field field(String name, ArrowType type, Field... children) {
field("j2", new ArrowType.Decimal(25, 5)),
field("k", new ArrowType.Date(DateUnit.DAY)),
field("l", new ArrowType.Time(TimeUnit.MILLISECOND, 32)),
field("m", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"))
field("m", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
field("n", new ArrowType.Time(TimeUnit.NANOSECOND, 64)),
field("o", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))
));

private final MessageType supportedTypesParquetSchema = Types.buildMessage()
Expand Down Expand Up @@ -234,6 +244,8 @@ private static Field field(String name, ArrowType type, Field... children) {
.addField(Types.optional(INT32).as(DATE).named("k"))
.addField(Types.optional(INT32).as(TIME_MILLIS).named("l"))
.addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("m"))
.addField(Types.optional(INT64).as(timeType(true, NANOS)).named("n"))
.addField(Types.optional(INT64).as(timestampType(true, NANOS)).named("o"))
.named("root");

private final Schema paperArrowSchema = new Schema(asList(
Expand Down Expand Up @@ -307,7 +319,7 @@ private void compareFields(List<Field> left, List<Field> right) {
@Test
public void testAllMap() throws IOException {
SchemaMapping map = converter.map(allTypesArrowSchema, allTypesParquetSchema);
Assert.assertEquals("p, s<p>, l<p>, l<p>, u<p>, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p", toSummaryString(map));
Assert.assertEquals("p, s<p>, l<p>, l<p>, u<p>, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p, p", toSummaryString(map));
}

private String toSummaryString(SchemaMapping map) {
Expand Down Expand Up @@ -387,13 +399,6 @@ public void testArrowTimeMicrosecondToParquet() {
Types.buildMessage().addField(Types.optional(INT64).as(timeType(false, MICROS)).named("a")).named("root"));
}

@Test(expected = UnsupportedOperationException.class)
public void testArrowTimeNanosecondToParquet() {
converter.fromArrow(new Schema(asList(
field("a", new ArrowType.Time(TimeUnit.NANOSECOND, 64))
))).getParquetSchema();
}

@Test
public void testParquetInt32TimeMillisToArrow() {
MessageType parquet = Types.buildMessage()
Expand Down Expand Up @@ -449,13 +454,6 @@ public void testArrowTimestampMicrosecondToParquet() {
Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT64).as(TIMESTAMP_MICROS).named("a")).named("root"));
}

@Test(expected = UnsupportedOperationException.class)
public void testArrowTimestampNanosecondToParquet() {
converter.fromArrow(new Schema(asList(
field("a", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))
))).getParquetSchema();
}

@Test
public void testParquetInt64TimestampMillisToArrow() {
MessageType parquet = Types.buildMessage()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {

public enum TimeUnit {
MILLIS,
MICROS
MICROS,
NANOS
}

public static class TimeLogicalTypeAnnotation extends LogicalTypeAnnotation {
Expand All @@ -540,7 +541,7 @@ public OriginalType toOriginalType() {
case MICROS:
return OriginalType.TIME_MICROS;
default:
throw new RuntimeException("Unknown original type for " + unit);
return null;
}
}

Expand Down Expand Up @@ -610,7 +611,7 @@ public OriginalType toOriginalType() {
case MICROS:
return OriginalType.TIMESTAMP_MICROS;
default:
throw new RuntimeException("Unknown original type for " + unit);
return null;
}
}

Expand Down Expand Up @@ -664,6 +665,8 @@ PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
return PrimitiveStringifier.TIMESTAMP_MICROS_STRINGIFIER;
case MILLIS:
return PrimitiveStringifier.TIMESTAMP_MILLIS_STRINGIFIER;
case NANOS:
return PrimitiveStringifier.TIMESTAMP_NANOS_STRINGIFIER;
default:
return super.valueStringifier(primitiveType);
}
Expand Down Expand Up @@ -697,7 +700,7 @@ public OriginalType toOriginalType() {
case 64:
return isSigned ? OriginalType.INT_64 : OriginalType.UINT_64;
default:
throw new RuntimeException("Unknown original type " + toOriginalType());
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.math.BigDecimal;
Expand Down Expand Up @@ -306,6 +307,19 @@ long toMillis(long value) {
}
};

static final PrimitiveStringifier TIMESTAMP_NANOS_STRINGIFIER = new DateStringifier(
"TIMESTAMP_NANOS_STRINGIFIER", "yyyy-MM-dd'T'HH:mm:ss.SSS") {
@Override
public String stringify(long value) {
return super.stringify(value) + String.format("%06d", Math.abs(value % 1_000_000));
}

@Override
long toMillis(long value) {
return value / 1_000_000;
}
};

static final PrimitiveStringifier TIME_STRINGIFIER = new PrimitiveStringifier("TIME_STRINGIFIER") {
@Override
public String stringify(int millis) {
Expand All @@ -331,6 +345,26 @@ private long convert(long duration, TimeUnit from, TimeUnit to, TimeUnit higher)
}
};

static final PrimitiveStringifier TIME_NANOS_STRINGIFIER = new PrimitiveStringifier("TIME_NANOS_STRINGIFIER") {
@Override
public String stringify(long nanos) {
return toTimeString(nanos, NANOSECONDS);
}

private String toTimeString(long nanos, TimeUnit unit) {
String format = "%02d:%02d:%02d.%09d";
return String.format(format,
unit.toHours(nanos),
convert(nanos, unit, MINUTES, HOURS),
convert(nanos, unit, SECONDS, MINUTES),
convert(nanos, unit, unit, SECONDS));
}

private long convert(long duration, TimeUnit from, TimeUnit to, TimeUnit higher) {
return Math.abs(to.convert(duration, from) % to.convert(1, higher));
}
};

static PrimitiveStringifier createDecimalStringifier(final int scale) {
return new BinaryStringifierBase("DECIMAL_STRINGIFIER(scale: " + scale + ")") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ public Optional<Boolean> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation t
checkInt32PrimitiveType(timeLogicalType);
break;
case MICROS:
case NANOS:
checkInt64PrimitiveType(timeLogicalType);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.parser;

import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.NANOS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
Expand Down Expand Up @@ -271,7 +272,9 @@ public void testTimeAnnotations() {
" required int64 timestamp (TIMESTAMP_MILLIS);" +
" required FIXED_LEN_BYTE_ARRAY(12) interval (INTERVAL);" +
" required int32 newTime (TIME(MILLIS,true));" +
" required int64 nanoTime (TIME(NANOS,true));" +
" required int64 newTimestamp (TIMESTAMP(MILLIS,false));" +
" required int64 nanoTimestamp (TIMESTAMP(NANOS,false));" +
"}\n";

MessageType parsed = MessageTypeParser.parseMessageType(message);
Expand All @@ -281,7 +284,9 @@ public void testTimeAnnotations() {
.required(INT64).as(TIMESTAMP_MILLIS).named("timestamp")
.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("interval")
.required(INT32).as(timeType(true, MILLIS)).named("newTime")
.required(INT64).as(timeType(true, NANOS)).named("nanoTime")
.required(INT64).as(timestampType(false, MILLIS)).named("newTimestamp")
.required(INT64).as(timestampType(false, NANOS)).named("nanoTimestamp")
.named("TimeMessage");

assertEquals(expected, parsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.parquet.schema.PrimitiveStringifier.DATE_STRINGIFIER;
import static org.apache.parquet.schema.PrimitiveStringifier.DEFAULT_STRINGIFIER;
import static org.apache.parquet.schema.PrimitiveStringifier.INTERVAL_STRINGIFIER;
import static org.apache.parquet.schema.PrimitiveStringifier.TIME_NANOS_STRINGIFIER;
import static org.apache.parquet.schema.PrimitiveStringifier.TIME_STRINGIFIER;
import static org.apache.parquet.schema.PrimitiveStringifier.UNSIGNED_STRINGIFIER;
import static org.apache.parquet.schema.PrimitiveStringifier.UTF8_STRINGIFIER;
Expand Down Expand Up @@ -200,6 +202,28 @@ public void testTimestampMicrosStringifier() {
checkThrowingUnsupportedException(stringifier, Long.TYPE);
}

@Test
public void testTimestampNanosStringifier() {
PrimitiveStringifier stringifier = PrimitiveStringifier.TIMESTAMP_NANOS_STRINGIFIER;

assertEquals("1970-01-01T00:00:00.000000000", stringifier.stringify(0l));

Calendar cal = Calendar.getInstance(UTC);
cal.clear();
cal.set(2053, Calendar.JULY, 10, 22, 13, 24);
cal.set(Calendar.MILLISECOND, 84);
long nanos = cal.getTimeInMillis() * 1_000_000 + 536;
assertEquals("2053-07-10T22:13:24.084000536", stringifier.stringify(nanos));

cal.clear();
cal.set(1848, Calendar.MARCH, 15, 9, 23, 59);
cal.set(Calendar.MILLISECOND, 765);
nanos = cal.getTimeInMillis() * 1_000_000 - 1;
assertEquals("1848-03-15T09:23:59.765000001", stringifier.stringify(nanos));

checkThrowingUnsupportedException(stringifier, Long.TYPE);
}

@Test
public void testTimeStringifier() {
PrimitiveStringifier stringifier = TIME_STRINGIFIER;
Expand All @@ -222,6 +246,20 @@ public void testTimeStringifier() {
checkThrowingUnsupportedException(stringifier, Integer.TYPE, Long.TYPE);
}

@Test
public void testTimeNanoStringifier() {
PrimitiveStringifier stringifier = TIME_NANOS_STRINGIFIER;

assertEquals("00:00:00.000000000", stringifier.stringify(0l));

assertEquals("12:34:56.789012987", stringifier.stringify(convert(NANOSECONDS, 12, 34, 56, 789012987)));
assertEquals("-12:34:56.000789012", stringifier.stringify(convert(NANOSECONDS, -12, -34, -56, -789012)));
assertEquals("12345:12:34.000056789", stringifier.stringify(convert(NANOSECONDS, 12345, 12, 34, 56789)));
assertEquals("-12345:12:34.000056789", stringifier.stringify(convert(NANOSECONDS, -12345, -12, -34, -56789)));

checkThrowingUnsupportedException(stringifier, Integer.TYPE, Long.TYPE);
}

private long convert(TimeUnit unit, long hours, long minutes, long seconds, long rest) {
return unit.convert(hours, HOURS) + unit.convert(minutes, MINUTES) + unit.convert(seconds, SECONDS) + rest;
}
Expand Down
Loading