diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java index 058331a44cf4..87e47f5961e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java @@ -157,7 +157,6 @@ public Instant toInputType(@NonNull Row base) { maxSubseconds, precision, subseconds); - return Instant.ofEpochSecond( checkArgumentNotNull( base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"), diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 38621571ca1d..882e46208a96 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -27,6 +27,7 @@ import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -81,6 +82,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; @@ -137,6 +139,7 @@ * LogicalTypes.TimestampMillis <-----> DATETIME * LogicalTypes.TimestampMicros ------> Long * LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1") + * LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9)) * LogicalTypes.Decimal <-----> DECIMAL * * @@ -164,6 +167,8 @@ public class AvroUtils { private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS; + private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; + static { GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData(); addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS); @@ -1027,6 +1032,11 @@ private static FieldType toFieldType(TypeWithNullability type) { fieldType = FieldType.DATETIME; } } + // TODO: Remove once Avro 1.12+ has timestamp-nanos + if (fieldType == null + && TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) { + fieldType = FieldType.logicalType(Timestamp.NANOS); + } if (fieldType == null) { switch (type.type.getType()) { @@ -1186,6 +1196,14 @@ private static org.apache.avro.Schema getFieldSchema( } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { baseType = LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); + } else if (Timestamp.IDENTIFIER.equals(identifier)) { + int precision = checkNotNull(logicalType.getArgument()); + if (precision != 9) { + throw new RuntimeException( + "Timestamp logical type precision not supported:" + precision); + } + baseType = org.apache.avro.Schema.create(Type.LONG); + baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE); } else { throw new RuntimeException( "Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier()); @@ -1340,6 +1358,16 @@ private static org.apache.avro.Schema getFieldSchema( java.time.Instant instant = (java.time.Instant) value; return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + } else if (Timestamp.IDENTIFIER.equals(identifier)) { + java.time.Instant instant = (java.time.Instant) value; + // Use BigInteger to work around long overflows so that epochNanos = Long.MIN_VALUE can be + // supported. Instant always stores nanos as positive adjustment so the math will silently + // overflow with regular int64. + BigInteger epochSeconds = BigInteger.valueOf(instant.getEpochSecond()); + BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano()); + BigInteger epochNanos = + epochSeconds.multiply(BigInteger.valueOf(1_000_000_000L)).add(nanosOfSecond); + return epochNanos.longValueExact(); } else { throw new RuntimeException("Unhandled logical type " + identifier); } @@ -1387,6 +1415,24 @@ private static Object convertLogicalType( @Nonnull FieldType fieldType, @Nonnull GenericData genericData) { TypeWithNullability type = new TypeWithNullability(avroSchema); + + // TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos + if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) { + if (type.type.getType() == Type.LONG) { + Long nanos = (Long) value; + // Check if Beam expects Timestamp logical type + if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE + && org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals( + fieldType.getLogicalType().getIdentifier())) { + long seconds = Math.floorDiv(nanos, 1_000_000_000L); + long nanoAdjustment = Math.floorMod(nanos, 1_000_000_000L); + return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment); + } else { + return nanos; + } + } + } + LogicalType logicalType = LogicalTypes.fromSchema(type.type); if (logicalType == null) { return null; diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 41a43ed850b7..d087ed0a20bc 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -549,6 +550,88 @@ public void testFromBeamSchema() { assertEquals(getAvroSchema(), avroSchema); } + @Test + public void testBeamTimestampNanosLogicalTypeToAvroSchema() { + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + + // Expected Avro schema with timestamp-nanos + String expectedJson = + "{\"type\": \"record\", \"name\": \"topLevelRecord\", " + + "\"fields\": [{\"name\": \"timestampNanos\", " + + "\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}]}"; + + org.apache.avro.Schema expectedAvroSchema = + new org.apache.avro.Schema.Parser().parse(expectedJson); + + assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema)); + } + + @Test + public void testBeamTimestampNanosToGenericRecord() { + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + + java.time.Instant instant = java.time.Instant.parse("2000-01-01T01:02:03.123456789Z"); + Row beamRow = Row.withSchema(beamSchema).addValue(instant).build(); + + // Expected nanos since epoch + long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema); + + assertEquals(expectedNanos, avroRecord.get("timestampNanos")); + } + + @Test + public void testTimestampNanosRoundTrip() { + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + + // Test various nanosecond precisions + java.time.Instant[] testInstants = { + java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano + java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos + java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos + java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported + java.time.Instant.parse("1677-09-21T00:12:43.145224192Z"), // min supported by an int64 + }; + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + + for (java.time.Instant instant : testInstants) { + Row originalRow = Row.withSchema(beamSchema).addValue(instant).build(); + GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, avroSchema); + Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema); + + assertEquals(originalRow, roundTripRow); + java.time.Instant roundTripInstant = + (java.time.Instant) roundTripRow.getValue("timestampNanos"); + assertEquals(instant, roundTripInstant); + } + } + + @Test + public void testTimestampNanosAvroSchemaToBeamSchema() { + List fields = Lists.newArrayList(); + fields.add( + new org.apache.avro.Schema.Field( + "timestampNanos", + new org.apache.avro.Schema.Parser() + .parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"), + "", + (Object) null)); + org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord("test", null, null, false, fields); + + Schema beamSchema = AvroUtils.toBeamSchema(avroSchema); + + Schema expected = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + assertEquals(expected, beamSchema); + } + @Test public void testAvroSchemaFromBeamSchemaCanBeParsed() { org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index c169a0571b79..b5243a8110b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -50,8 +50,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; /** A set of utilities for working with Avro files. */ class BigQueryAvroUtils { @@ -60,7 +58,7 @@ class BigQueryAvroUtils { Optional.ofNullable(Schema.class.getPackage()) .map(Package::getImplementationVersion) .orElse(""); - + private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; // org.apache.avro.LogicalType static class DateTimeLogicalType extends LogicalType { public DateTimeLogicalType() { @@ -161,36 +159,73 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and * immutable. */ - private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER = - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC(); - - @VisibleForTesting - static String formatTimestamp(Long timestampMicro) { - String dateTime = formatDatetime(timestampMicro); - return dateTime + " UTC"; + private static final java.time.format.DateTimeFormatter DATE_TIME_FORMATTER = + java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") + .withZone(java.time.ZoneOffset.UTC); + + /** Enum to define the precision of a timestamp since the epoch. */ + enum TimestampPrecision { + MILLISECONDS, + MICROSECONDS, + NANOSECONDS; + + /** Converts an epoch value of this precision to an Instant. */ + java.time.Instant toInstant(long epochValue) { + switch (this) { + case MILLISECONDS: + return java.time.Instant.ofEpochMilli(epochValue); + case MICROSECONDS: + { + long seconds = Math.floorDiv(epochValue, 1_000_000L); + long microsOfSecond = Math.floorMod(epochValue, 1_000_000L); + return java.time.Instant.ofEpochSecond(seconds, microsOfSecond * 1_000L); + } + case NANOSECONDS: + { + long seconds = Math.floorDiv(epochValue, 1_000_000_000L); + long nanosOfSecond = Math.floorMod(epochValue, 1_000_000_000L); + return java.time.Instant.ofEpochSecond(seconds, nanosOfSecond); + } + default: + throw new IllegalStateException("Unknown precision: " + this); + } + } } + /** + * Formats an Instant with minimal fractional second precision. Shows 0, 3, 6, or 9 decimal places + * based on actual precision of the value. + */ @VisibleForTesting - static String formatDatetime(Long timestampMicro) { - // timestampMicro is in "microseconds since epoch" format, - // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC". - // Separate into seconds and microseconds. - long timestampSec = timestampMicro / 1_000_000; - long micros = timestampMicro % 1_000_000; - if (micros < 0) { - micros += 1_000_000; - timestampSec -= 1; - } - String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000); - if (micros == 0) { - return dayAndTime; - } else if (micros % 1000 == 0) { - return String.format("%s.%03d", dayAndTime, micros / 1000); + @SuppressWarnings("JavaInstantGetSecondsGetNano") + static String formatDatetime(java.time.Instant instant) { + String dateTime = DATE_TIME_FORMATTER.format(instant); + int nanos = instant.getNano(); + + if (nanos == 0) { + return dateTime; + } else if (nanos % 1_000_000 == 0) { + return dateTime + String.format(".%03d", nanos / 1_000_000); + } else if (nanos % 1_000 == 0) { + return dateTime + String.format(".%06d", nanos / 1_000); } else { - return String.format("%s.%06d", dayAndTime, micros); + return dateTime + String.format(".%09d", nanos); } } + @VisibleForTesting + static String formatDatetime(long epochValue, TimestampPrecision precision) { + return formatDatetime(precision.toInstant(epochValue)); + } + + static String formatTimestamp(java.time.Instant instant) { + return formatDatetime(instant) + " UTC"; + } + + static String formatTimestamp(long epochValue, TimestampPrecision precision) { + return formatTimestamp(precision.toInstant(epochValue)); + } + /** * This method formats a BigQuery DATE value into a String matching the format used by JSON * export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic @@ -335,7 +370,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v) // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. checkNotNull(v, "REQUIRED field %s should not be null", name); - Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); switch (type) { @@ -364,21 +398,26 @@ private static Object convertRequiredField(String name, Schema schema, Object v) } else if (logicalType instanceof LogicalTypes.TimestampMillis) { // Write only: SQL type TIMESTAMP // ideally Instant but TableRowJsonCoder encodes as String - return formatTimestamp((Long) v * 1000L); + return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS); } else if (logicalType instanceof LogicalTypes.TimestampMicros) { // SQL type TIMESTAMP // ideally Instant but TableRowJsonCoder encodes as String - return formatTimestamp((Long) v); + return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS); + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + } else if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) { + // SQL type TIMESTAMP + // ideally Instant but TableRowJsonCoder encodes as String + return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS); } else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9")) && logicalType instanceof LogicalTypes.LocalTimestampMillis) { // Write only: SQL type DATETIME // ideally LocalDateTime but TableRowJsonCoder encodes as String - return formatDatetime(((Long) v) * 1000); + return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS); } else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9")) && logicalType instanceof LogicalTypes.LocalTimestampMicros) { // Write only: SQL type DATETIME // ideally LocalDateTime but TableRowJsonCoder encodes as String - return formatDatetime((Long) v); + return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS); } else { // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses @@ -602,6 +641,11 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv return fieldSchema.setType("INTEGER"); } case LONG: + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + if (useAvroLogicalTypes + && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { + return fieldSchema.setType("TIMESTAMP"); + } if (logicalType instanceof LogicalTypes.TimeMicros) { return fieldSchema.setType("TIME"); } else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9")) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 9b752055d011..e95e15465966 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -280,6 +280,30 @@ public void testConvertGenericRecordToTableRow() { assertEquals(expected, row.clone()); } + { + // timestamp-nanos + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"; + Schema timestampType = new Schema.Parser().parse(timestampNanosJson); + + // 2000-01-01 01:02:03.123456789 UTC + LocalDate date = LocalDate.of(2000, 1, 1); + LocalTime time = LocalTime.of(1, 2, 3, 123456789); + LocalDateTime ts = LocalDateTime.of(date, time); + long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond(); + int nanos = ts.toInstant(ZoneOffset.UTC).getNano(); + long totalNanos = seconds * 1_000_000_000L + nanos; + GenericRecord record = + new GenericRecordBuilder(avroSchema(f -> f.type(timestampType).noDefault())) + .set("value", totalNanos) + .build(); + TableRow expected = new TableRow().set("value", "2000-01-01 01:02:03.123456789 UTC"); + TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record); + + assertEquals(expected, row); + assertEquals(expected, row.clone()); + } + { // timestamp-micros LogicalType lt = LogicalTypes.timestampMillis(); @@ -923,6 +947,19 @@ public void testConvertAvroSchemaToBigQuerySchema() { assertEquals(expectedRaw, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false)); } + { + // timestamp-nanos + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"; + Schema timestampType = new Schema.Parser().parse(timestampNanosJson); + Schema avroSchema = avroSchema(f -> f.type(timestampType).noDefault()); + TableSchema expected = tableSchema(f -> f.setType("TIMESTAMP").setMode("REQUIRED")); + TableSchema expectedRaw = tableSchema(f -> f.setType("INTEGER").setMode("REQUIRED")); + + assertEquals(expected, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema)); + assertEquals(expectedRaw, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false)); + } + { // string prop: sqlType=GEOGRAPHY Schema avroSchema = @@ -978,39 +1015,138 @@ public void testConvertAvroSchemaToBigQuerySchema() { } @Test - public void testFormatTimestamp() { - long micros = 1452062291123456L; + public void testFormatTimestampInputMillis() { + // Min: Earliest timestamp supported by BQ + // https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type + long minMillis = -62135596800000L; + String expectedMin = "0001-01-01 00:00:00"; + assertThat( + BigQueryAvroUtils.formatDatetime( + minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMin)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMin + " UTC")); + + // Existing: Regular timestamp + long millis = 1452062291123L; + String expected = "2016-01-06 06:38:11.123"; + assertThat( + BigQueryAvroUtils.formatDatetime(millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expected)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expected + " UTC")); + + // Max: Latest timestamp supported by BQ + long maxMillis = 253402300799999L; + String expectedMax = "9999-12-31 23:59:59.999"; + assertThat( + BigQueryAvroUtils.formatDatetime( + maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMax)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMax + " UTC")); + } + + @Test + public void testFormatTimestampInputMicros() { + long minMicro = -62_135_596_800_000_000L; + String expectedMin = "0001-01-01 00:00:00"; + assertThat( + BigQueryAvroUtils.formatDatetime( + minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMin)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMin + " UTC")); + + long micros = 1452_062_291_123_456L; String expected = "2016-01-06 06:38:11.123456"; - assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected)); - assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " UTC")); + assertThat( + BigQueryAvroUtils.formatDatetime(micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expected)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expected + " UTC")); + + // Max: Latest timestamp supported by BQ + long maxMicros = 253_402_300_799_999_000L; + String expectedMax = "9999-12-31 23:59:59.999"; + assertThat( + BigQueryAvroUtils.formatDatetime( + maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMax)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMax + " UTC")); } @Test - public void testFormatTimestampMillis() { - long millis = 1452062291123L; - long micros = millis * 1000L; - String expected = "2016-01-06 06:38:11.123"; - assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected)); - assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " UTC")); + public void testFormatTimestampInputNanos() { + long minNanos = Long.MIN_VALUE; // -9223372036854775808L + String expectedMin = "1677-09-21 00:12:43.145224192"; + assertThat( + BigQueryAvroUtils.formatDatetime( + minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMin)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMin + " UTC")); + + long nanos = 1452062291123456789L; + String expected = "2016-01-06 06:38:11.123456789"; + assertThat( + BigQueryAvroUtils.formatDatetime(nanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expected)); + assertThat( + BigQueryAvroUtils.formatTimestamp(nanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expected + " UTC")); + + long maxNanos = Long.MAX_VALUE; // 9223372036854775807L + String expectedMax = "2262-04-11 23:47:16.854775807"; + assertThat( + BigQueryAvroUtils.formatDatetime( + maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMax)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMax + " UTC")); } @Test - public void testFormatTimestampSeconds() { + public void testFormatTimestampInputMicrosOutputSecondsFormat() { + BigQueryAvroUtils.TimestampPrecision precision = + BigQueryAvroUtils.TimestampPrecision.MICROSECONDS; long seconds = 1452062291L; long micros = seconds * 1000L * 1000L; String expected = "2016-01-06 06:38:11"; - assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected)); - assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " UTC")); + assertThat(BigQueryAvroUtils.formatDatetime(micros, precision), equalTo(expected)); + assertThat(BigQueryAvroUtils.formatTimestamp(micros, precision), equalTo(expected + " UTC")); } @Test public void testFormatTimestampNegative() { - assertThat(BigQueryAvroUtils.formatDatetime(-1L), equalTo("1969-12-31 23:59:59.999999")); - assertThat(BigQueryAvroUtils.formatDatetime(-100_000L), equalTo("1969-12-31 23:59:59.900")); - assertThat(BigQueryAvroUtils.formatDatetime(-1_000_000L), equalTo("1969-12-31 23:59:59")); + BigQueryAvroUtils.TimestampPrecision precision = + BigQueryAvroUtils.TimestampPrecision.MICROSECONDS; + assertThat( + BigQueryAvroUtils.formatDatetime(-1L, precision), equalTo("1969-12-31 23:59:59.999999")); + assertThat( + BigQueryAvroUtils.formatDatetime(-100_000L, precision), equalTo("1969-12-31 23:59:59.900")); + assertThat( + BigQueryAvroUtils.formatDatetime(-1_000_000L, precision), equalTo("1969-12-31 23:59:59")); // No leap seconds before 1972. 477 leap years from 1 through 1969. assertThat( - BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 1_000_000), + BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 1_000_000, precision), equalTo("0001-01-01 00:00:00")); }