From 1d0a0ef8308ddd6c689ed0ce3cb0895c2d076c82 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 24 Nov 2025 17:28:14 -0500 Subject: [PATCH 1/7] Add timestamp-nanos avro logical type support in bigquery utils. --- .../avro/schemas/utils/AvroUtils.java | 34 +++++++ .../avro/schemas/utils/AvroUtilsTest.java | 81 ++++++++++++++++ .../io/gcp/bigquery/BigQueryAvroUtils.java | 95 +++++++++++++----- .../gcp/bigquery/BigQueryAvroUtilsTest.java | 97 ++++++++++++++++--- 4 files changed, 269 insertions(+), 38 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 38621571ca1d..5395be9e93e4 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -81,6 +81,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.FixedString; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes; import org.apache.beam.sdk.schemas.logicaltypes.VariableString; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; @@ -137,6 +138,7 @@ * LogicalTypes.TimestampMillis <-----> DATETIME * LogicalTypes.TimestampMicros ------> Long * LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1") + * LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9)) * LogicalTypes.Decimal <-----> DECIMAL * * @@ -164,6 +166,8 @@ public class AvroUtils { private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS; + private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; + static { GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData(); addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS); @@ -1027,6 +1031,11 @@ private static FieldType toFieldType(TypeWithNullability type) { fieldType = FieldType.DATETIME; } } + // TODO: Remove once Avro 1.12+ has timestamp-nanos + if (fieldType == null + && TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) { + fieldType = FieldType.logicalType(Timestamp.NANOS); + } if (fieldType == null) { switch (type.type.getType()) { @@ -1186,6 +1195,10 @@ private static org.apache.avro.Schema getFieldSchema( } else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) { baseType = LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); + } else if (Timestamp.IDENTIFIER.equals(identifier)) { + baseType = + new org.apache.avro.Schema.Parser() + .parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"); } else { throw new RuntimeException( "Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier()); @@ -1340,6 +1353,11 @@ private static org.apache.avro.Schema getFieldSchema( java.time.Instant instant = (java.time.Instant) value; return TimeUnit.SECONDS.toMicros(instant.getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); + } else if (Timestamp.IDENTIFIER.equals(identifier)) { + java.time.Instant instant = (java.time.Instant) value; + long epochSeconds = instant.getEpochSecond(); + int nanoOfSecond = instant.getNano(); + return (epochSeconds * 1_000_000_000L) + nanoOfSecond; } else { throw new RuntimeException("Unhandled logical type " + identifier); } @@ -1387,6 +1405,22 @@ private static Object convertLogicalType( @Nonnull FieldType fieldType, @Nonnull GenericData genericData) { TypeWithNullability type = new TypeWithNullability(avroSchema); + + // TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos + if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) { + if (type.type.getType() == Type.LONG) { + Long nanos = (Long) value; + // Check if Beam expects Timestamp logical type + if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE + && org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals( + fieldType.getLogicalType().getIdentifier())) { + return java.time.Instant.ofEpochSecond(0L, nanos); + } else { + return nanos; + } + } + } + LogicalType logicalType = LogicalTypes.fromSchema(type.type); if (logicalType == null) { return null; diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index 41a43ed850b7..f0179f6de5d3 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -54,6 +54,7 @@ import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType; import org.apache.beam.sdk.schemas.logicaltypes.OneOfType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -549,6 +550,86 @@ public void testFromBeamSchema() { assertEquals(getAvroSchema(), avroSchema); } + @Test + public void testBeamTimestampNanosLogicalTypeToAvroSchema() { + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + + // Expected Avro schema with timestamp-nanos + String expectedJson = + "{\"type\": \"record\", \"name\": \"topLevelRecord\", " + + "\"fields\": [{\"name\": \"timestampNanos\", " + + "\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}]}"; + + org.apache.avro.Schema expectedAvroSchema = + new org.apache.avro.Schema.Parser().parse(expectedJson); + + assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema)); + } + + @Test + public void testBeamTimestampNanosToGenericRecord() { + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + + java.time.Instant instant = java.time.Instant.parse("2000-01-01T01:02:03.123456789Z"); + Row beamRow = Row.withSchema(beamSchema).addValue(instant).build(); + + // Expected nanos since epoch + long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema); + + assertEquals(expectedNanos, avroRecord.get("timestampNanos")); + } + + @Test + public void testTimestampNanosRoundTrip() { + Schema beamSchema = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + + // Test various nanosecond precisions + java.time.Instant[] testInstants = { + java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano + java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos + java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos + }; + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + + for (java.time.Instant instant : testInstants) { + Row originalRow = Row.withSchema(beamSchema).addValue(instant).build(); + GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, avroSchema); + Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema); + + assertEquals(originalRow, roundTripRow); + java.time.Instant roundTripInstant = + (java.time.Instant) roundTripRow.getValue("timestampNanos"); + assertEquals(instant, roundTripInstant); + } + } + + @Test + public void testTimestampNanosAvroSchemaToBeamSchema() { + List fields = Lists.newArrayList(); + fields.add( + new org.apache.avro.Schema.Field( + "timestampNanos", + new org.apache.avro.Schema.Parser() + .parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"), + "", + (Object) null)); + org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord("test", null, null, false, fields); + + Schema beamSchema = AvroUtils.toBeamSchema(avroSchema); + + Schema expected = + Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build(); + assertEquals(expected, beamSchema); + } + @Test public void testAvroSchemaFromBeamSchemaCanBeParsed() { org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index c169a0571b79..df93f256872e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -60,7 +60,7 @@ class BigQueryAvroUtils { Optional.ofNullable(Schema.class.getPackage()) .map(Package::getImplementationVersion) .orElse(""); - + private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos"; // org.apache.avro.LogicalType static class DateTimeLogicalType extends LogicalType { public DateTimeLogicalType() { @@ -164,33 +164,76 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC(); - @VisibleForTesting - static String formatTimestamp(Long timestampMicro) { - String dateTime = formatDatetime(timestampMicro); - return dateTime + " UTC"; + /** + * Enum to define the precision of a timestamp since the epoch. It provides methods to normalize + * any precision to seconds and nanoseconds. + */ + enum TimestampPrecision { + MILLISECONDS(1_000L, 1_000_000L), + MICROSECONDS(1_000_000L, 1_000L), + NANOSECONDS(1_000_000_000L, 1L); + + private final long divisorForSeconds; + private final long nanoMultiplier; + + TimestampPrecision(long divisorForSeconds, long nanoMultiplier) { + this.divisorForSeconds = divisorForSeconds; + this.nanoMultiplier = nanoMultiplier; + } + + public long getDivisorForSeconds() { + return divisorForSeconds; + } + + public long toNanos(long fractionalPart) { + return fractionalPart * this.nanoMultiplier; + } + + public String formatFractional(long nanoOfSecond) { + if (nanoOfSecond % 1_000_000 == 0) { + return String.format(".%03d", nanoOfSecond / 1_000_000); + } else if (nanoOfSecond % 1000 == 0) { + return String.format(".%06d", nanoOfSecond / 1000); + } else { + return String.format(".%09d", nanoOfSecond); + } + } } + /** + * Formats a timestamp value with specified precision. + * + * @param timestamp The timestamp value in units specified by precision (milliseconds, + * microseconds, or nanoseconds since epoch) + * @param precision The precision of the input timestamp + * @return Formatted string in "yyyy-MM-dd HH:mm:ss[.fraction]" format + */ @VisibleForTesting - static String formatDatetime(Long timestampMicro) { - // timestampMicro is in "microseconds since epoch" format, - // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC". - // Separate into seconds and microseconds. - long timestampSec = timestampMicro / 1_000_000; - long micros = timestampMicro % 1_000_000; - if (micros < 0) { - micros += 1_000_000; + static String formatDatetime(long timestamp, TimestampPrecision precision) { + long divisor = precision.getDivisorForSeconds(); + long timestampSec = timestamp / divisor; + long fractionalPart = timestamp % divisor; + + if (fractionalPart < 0) { + fractionalPart += divisor; timestampSec -= 1; } + String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000); - if (micros == 0) { + + long nanoOfSecond = precision.toNanos(fractionalPart); + + if (nanoOfSecond == 0) { return dayAndTime; - } else if (micros % 1000 == 0) { - return String.format("%s.%03d", dayAndTime, micros / 1000); } else { - return String.format("%s.%06d", dayAndTime, micros); + return dayAndTime + precision.formatFractional(nanoOfSecond); } } + static String formatTimestamp(long timestamp, TimestampPrecision precision) { + return formatDatetime(timestamp, precision) + " UTC"; + } + /** * This method formats a BigQuery DATE value into a String matching the format used by JSON * export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic @@ -335,7 +378,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v) // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery // INTEGER type maps to an Avro LONG type. checkNotNull(v, "REQUIRED field %s should not be null", name); - Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); switch (type) { @@ -364,21 +406,26 @@ private static Object convertRequiredField(String name, Schema schema, Object v) } else if (logicalType instanceof LogicalTypes.TimestampMillis) { // Write only: SQL type TIMESTAMP // ideally Instant but TableRowJsonCoder encodes as String - return formatTimestamp((Long) v * 1000L); + return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS); } else if (logicalType instanceof LogicalTypes.TimestampMicros) { // SQL type TIMESTAMP // ideally Instant but TableRowJsonCoder encodes as String - return formatTimestamp((Long) v); + return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS); + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + } else if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) { + // SQL type TIMESTAMP + // ideally Instant but TableRowJsonCoder encodes as String + return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS); } else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9")) && logicalType instanceof LogicalTypes.LocalTimestampMillis) { // Write only: SQL type DATETIME // ideally LocalDateTime but TableRowJsonCoder encodes as String - return formatDatetime(((Long) v) * 1000); + return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS); } else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9")) && logicalType instanceof LogicalTypes.LocalTimestampMicros) { // Write only: SQL type DATETIME // ideally LocalDateTime but TableRowJsonCoder encodes as String - return formatDatetime((Long) v); + return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS); } else { // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT) // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses @@ -602,6 +649,10 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv return fieldSchema.setType("INTEGER"); } case LONG: + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + if (useAvroLogicalTypes && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { + return fieldSchema.setType("TIMESTAMP"); + } if (logicalType instanceof LogicalTypes.TimeMicros) { return fieldSchema.setType("TIME"); } else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9")) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 9b752055d011..9391dbd58b39 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -280,6 +280,30 @@ public void testConvertGenericRecordToTableRow() { assertEquals(expected, row.clone()); } + { + // timestamp-nanos + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"; + Schema timestampType = new Schema.Parser().parse(timestampNanosJson); + + // 2000-01-01 01:02:03.123456789 UTC + LocalDate date = LocalDate.of(2000, 1, 1); + LocalTime time = LocalTime.of(1, 2, 3, 123456789); + LocalDateTime ts = LocalDateTime.of(date, time); + long seconds = ts.toInstant(ZoneOffset.UTC).getEpochSecond(); + int nanos = ts.toInstant(ZoneOffset.UTC).getNano(); + long totalNanos = seconds * 1_000_000_000L + nanos; + GenericRecord record = + new GenericRecordBuilder(avroSchema(f -> f.type(timestampType).noDefault())) + .set("value", totalNanos) + .build(); + TableRow expected = new TableRow().set("value", "2000-01-01 01:02:03.123456789 UTC"); + TableRow row = BigQueryAvroUtils.convertGenericRecordToTableRow(record); + + assertEquals(expected, row); + assertEquals(expected, row.clone()); + } + { // timestamp-micros LogicalType lt = LogicalTypes.timestampMillis(); @@ -923,6 +947,19 @@ public void testConvertAvroSchemaToBigQuerySchema() { assertEquals(expectedRaw, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false)); } + { + // timestamp-nanos + // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. + String timestampNanosJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"; + Schema timestampType = new Schema.Parser().parse(timestampNanosJson); + Schema avroSchema = avroSchema(f -> f.type(timestampType).noDefault()); + TableSchema expected = tableSchema(f -> f.setType("TIMESTAMP").setMode("REQUIRED")); + TableSchema expectedRaw = tableSchema(f -> f.setType("INTEGER").setMode("REQUIRED")); + + assertEquals(expected, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema)); + assertEquals(expectedRaw, BigQueryAvroUtils.fromGenericAvroSchema(avroSchema, false)); + } + { // string prop: sqlType=GEOGRAPHY Schema avroSchema = @@ -978,39 +1015,67 @@ public void testConvertAvroSchemaToBigQuerySchema() { } @Test - public void testFormatTimestamp() { + public void testFormatTimestampInputMillis() { + long millis = 1452062291123L; + String expected = "2016-01-06 06:38:11.123"; + assertThat( + BigQueryAvroUtils.formatDatetime(millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expected)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expected + " UTC")); + } + + @Test + public void testFormatTimestampInputMicros() { long micros = 1452062291123456L; String expected = "2016-01-06 06:38:11.123456"; - assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected)); - assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " UTC")); + assertThat( + BigQueryAvroUtils.formatDatetime(micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expected)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expected + " UTC")); } @Test - public void testFormatTimestampMillis() { - long millis = 1452062291123L; - long micros = millis * 1000L; - String expected = "2016-01-06 06:38:11.123"; - assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected)); - assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " UTC")); + public void testFormatTimestampInputNanos() { + long nanos = 1452062291123456789L; + String expected = "2016-01-06 06:38:11.123456789"; + assertThat( + BigQueryAvroUtils.formatDatetime(nanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expected)); + assertThat( + BigQueryAvroUtils.formatTimestamp(nanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expected + " UTC")); } @Test - public void testFormatTimestampSeconds() { + public void testFormatTimestampInputMicrosOutputSecondsFormat() { + BigQueryAvroUtils.TimestampPrecision precision = + BigQueryAvroUtils.TimestampPrecision.MICROSECONDS; long seconds = 1452062291L; long micros = seconds * 1000L * 1000L; String expected = "2016-01-06 06:38:11"; - assertThat(BigQueryAvroUtils.formatDatetime(micros), equalTo(expected)); - assertThat(BigQueryAvroUtils.formatTimestamp(micros), equalTo(expected + " UTC")); + assertThat(BigQueryAvroUtils.formatDatetime(micros, precision), equalTo(expected)); + assertThat(BigQueryAvroUtils.formatTimestamp(micros, precision), equalTo(expected + " UTC")); } @Test public void testFormatTimestampNegative() { - assertThat(BigQueryAvroUtils.formatDatetime(-1L), equalTo("1969-12-31 23:59:59.999999")); - assertThat(BigQueryAvroUtils.formatDatetime(-100_000L), equalTo("1969-12-31 23:59:59.900")); - assertThat(BigQueryAvroUtils.formatDatetime(-1_000_000L), equalTo("1969-12-31 23:59:59")); + BigQueryAvroUtils.TimestampPrecision precision = + BigQueryAvroUtils.TimestampPrecision.MICROSECONDS; + assertThat( + BigQueryAvroUtils.formatDatetime(-1L, precision), equalTo("1969-12-31 23:59:59.999999")); + assertThat( + BigQueryAvroUtils.formatDatetime(-100_000L, precision), equalTo("1969-12-31 23:59:59.900")); + assertThat( + BigQueryAvroUtils.formatDatetime(-1_000_000L, precision), equalTo("1969-12-31 23:59:59")); // No leap seconds before 1972. 477 leap years from 1 through 1969. assertThat( - BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 1_000_000), + BigQueryAvroUtils.formatDatetime(-(1969L * 365 + 477) * 86400 * 1_000_000, precision), equalTo("0001-01-01 00:00:00")); } From e8c61c5764af6730888e4a93258f74a8e61b7f9a Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 25 Nov 2025 14:54:57 -0500 Subject: [PATCH 2/7] Lint. --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index df93f256872e..16eaffd9092d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -650,7 +650,8 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv } case LONG: // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. - if (useAvroLogicalTypes && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { + if (useAvroLogicalTypes + && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { return fieldSchema.setType("TIMESTAMP"); } if (logicalType instanceof LogicalTypes.TimeMicros) { From 93e7d2440a95972ee39cbaa093d043f6ad534c73 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 25 Nov 2025 17:29:01 -0500 Subject: [PATCH 3/7] Avoid overflows for millis/micros. --- .../sdk/schemas/logicaltypes/Timestamp.java | 1 - .../avro/schemas/utils/AvroUtils.java | 11 +- .../avro/schemas/utils/AvroUtilsTest.java | 2 + .../io/gcp/bigquery/BigQueryAvroUtils.java | 105 ++++++++---------- .../gcp/bigquery/BigQueryAvroUtilsTest.java | 72 +++++++++++- 5 files changed, 127 insertions(+), 64 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java index 058331a44cf4..87e47f5961e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java @@ -157,7 +157,6 @@ public Instant toInputType(@NonNull Row base) { maxSubseconds, precision, subseconds); - return Instant.ofEpochSecond( checkArgumentNotNull( base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"), diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index 5395be9e93e4..d2a2dc49e13c 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -27,6 +27,7 @@ import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -1355,9 +1356,13 @@ private static org.apache.avro.Schema getFieldSchema( + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); } else if (Timestamp.IDENTIFIER.equals(identifier)) { java.time.Instant instant = (java.time.Instant) value; - long epochSeconds = instant.getEpochSecond(); - int nanoOfSecond = instant.getNano(); - return (epochSeconds * 1_000_000_000L) + nanoOfSecond; + // Use BigInteger to work around long overflows so that minimum timestamp can be + // supported. + BigInteger epochSeconds = BigInteger.valueOf(instant.getEpochSecond()); + BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano()); + BigInteger epochNanos = + epochSeconds.multiply(BigInteger.valueOf(1_000_000_000L)).add(nanosOfSecond); + return epochNanos.longValueExact(); } else { throw new RuntimeException("Unhandled logical type " + identifier); } diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index f0179f6de5d3..d4b46e2b4484 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -594,6 +594,8 @@ public void testTimestampNanosRoundTrip() { java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos + java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported + java.time.Instant.ofEpochSecond(0L, Long.MIN_VALUE), // min supported }; org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 16eaffd9092d..960027f0b67d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -50,8 +50,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; /** A set of utilities for working with Avro files. */ class BigQueryAvroUtils { @@ -161,77 +159,66 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and * immutable. */ - private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER = - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC(); + private static final java.time.format.DateTimeFormatter DATE_TIME_FORMATTER = + java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") + .withZone(java.time.ZoneOffset.UTC); - /** - * Enum to define the precision of a timestamp since the epoch. It provides methods to normalize - * any precision to seconds and nanoseconds. - */ + /** Enum to define the precision of a timestamp since the epoch. */ enum TimestampPrecision { - MILLISECONDS(1_000L, 1_000_000L), - MICROSECONDS(1_000_000L, 1_000L), - NANOSECONDS(1_000_000_000L, 1L); - - private final long divisorForSeconds; - private final long nanoMultiplier; - - TimestampPrecision(long divisorForSeconds, long nanoMultiplier) { - this.divisorForSeconds = divisorForSeconds; - this.nanoMultiplier = nanoMultiplier; - } - - public long getDivisorForSeconds() { - return divisorForSeconds; - } - - public long toNanos(long fractionalPart) { - return fractionalPart * this.nanoMultiplier; - } - - public String formatFractional(long nanoOfSecond) { - if (nanoOfSecond % 1_000_000 == 0) { - return String.format(".%03d", nanoOfSecond / 1_000_000); - } else if (nanoOfSecond % 1000 == 0) { - return String.format(".%06d", nanoOfSecond / 1000); - } else { - return String.format(".%09d", nanoOfSecond); + MILLISECONDS, + MICROSECONDS, + NANOSECONDS; + + /** Converts an epoch value of this precision to an Instant. */ + java.time.Instant toInstant(long epochValue) { + switch (this) { + case MILLISECONDS: + return java.time.Instant.ofEpochMilli(epochValue); + case MICROSECONDS: + return java.time.Instant.ofEpochSecond( + epochValue / 1_000_000L, (epochValue % 1_000_000L) * 1_000L); + case NANOSECONDS: + return java.time.Instant.ofEpochSecond( + epochValue / 1_000_000_000L, epochValue % 1_000_000_000L); + default: + throw new IllegalStateException("Unknown precision: " + this); } } } /** - * Formats a timestamp value with specified precision. - * - * @param timestamp The timestamp value in units specified by precision (milliseconds, - * microseconds, or nanoseconds since epoch) - * @param precision The precision of the input timestamp - * @return Formatted string in "yyyy-MM-dd HH:mm:ss[.fraction]" format + * Formats an Instant with minimal fractional second precision. Shows 0, 3, 6, or 9 decimal places + * based on actual precision of the value. */ @VisibleForTesting - static String formatDatetime(long timestamp, TimestampPrecision precision) { - long divisor = precision.getDivisorForSeconds(); - long timestampSec = timestamp / divisor; - long fractionalPart = timestamp % divisor; - - if (fractionalPart < 0) { - fractionalPart += divisor; - timestampSec -= 1; + @SuppressWarnings("JavaInstantGetSecondsGetNano") + static String formatDatetime(java.time.Instant instant) { + System.out.println(instant); + String dateTime = DATE_TIME_FORMATTER.format(instant); + int nanos = instant.getNano(); + + if (nanos == 0) { + return dateTime; + } else if (nanos % 1_000_000 == 0) { + return dateTime + String.format(".%03d", nanos / 1_000_000); + } else if (nanos % 1_000 == 0) { + return dateTime + String.format(".%06d", nanos / 1_000); + } else { + return dateTime + String.format(".%09d", nanos); } + } - String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000); - - long nanoOfSecond = precision.toNanos(fractionalPart); + @VisibleForTesting + static String formatDatetime(long epochValue, TimestampPrecision precision) { + return formatDatetime(precision.toInstant(epochValue)); + } - if (nanoOfSecond == 0) { - return dayAndTime; - } else { - return dayAndTime + precision.formatFractional(nanoOfSecond); - } + static String formatTimestamp(java.time.Instant instant) { + return formatDatetime(instant) + " UTC"; } - static String formatTimestamp(long timestamp, TimestampPrecision precision) { - return formatDatetime(timestamp, precision) + " UTC"; + static String formatTimestamp(long epochValue, TimestampPrecision precision) { + return formatTimestamp(precision.toInstant(epochValue)); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 9391dbd58b39..5a0c35eca2fe 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -1016,6 +1016,19 @@ public void testConvertAvroSchemaToBigQuerySchema() { @Test public void testFormatTimestampInputMillis() { + // Min: Earliest timestamp supported by BQ + long minMillis = -62135596800000L; + String expectedMin = "0001-01-01 00:00:00"; + assertThat( + BigQueryAvroUtils.formatDatetime( + minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMin)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + minMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMin + " UTC")); + + // Existing: Regular timestamp long millis = 1452062291123L; String expected = "2016-01-06 06:38:11.123"; assertThat( @@ -1025,11 +1038,34 @@ public void testFormatTimestampInputMillis() { BigQueryAvroUtils.formatTimestamp( millis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), equalTo(expected + " UTC")); + + // Max: Latest timestamp supported by BQ + long maxMillis = 253402300799999L; + String expectedMax = "9999-12-31 23:59:59.999"; + assertThat( + BigQueryAvroUtils.formatDatetime( + maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMax)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + maxMillis, BigQueryAvroUtils.TimestampPrecision.MILLISECONDS), + equalTo(expectedMax + " UTC")); } @Test public void testFormatTimestampInputMicros() { - long micros = 1452062291123456L; + long minMicro = -62_135_596_800_000_000L; + String expectedMin = "0001-01-01 00:00:00"; + assertThat( + BigQueryAvroUtils.formatDatetime( + minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMin)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + minMicro, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMin + " UTC")); + + long micros = 1452_062_291_123_456L; String expected = "2016-01-06 06:38:11.123456"; assertThat( BigQueryAvroUtils.formatDatetime(micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), @@ -1038,10 +1074,33 @@ public void testFormatTimestampInputMicros() { BigQueryAvroUtils.formatTimestamp( micros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), equalTo(expected + " UTC")); + + // Max: Latest timestamp supported by BQ + long maxMicros = 253_402_300_799_999_000L; + String expectedMax = "9999-12-31 23:59:59.999"; + assertThat( + BigQueryAvroUtils.formatDatetime( + maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMax)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + maxMicros, BigQueryAvroUtils.TimestampPrecision.MICROSECONDS), + equalTo(expectedMax + " UTC")); } @Test public void testFormatTimestampInputNanos() { + long minNanos = Long.MIN_VALUE; // -9223372036854775808L + String expectedMin = "1677-09-21 00:12:43.145224192"; + assertThat( + BigQueryAvroUtils.formatDatetime( + minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMin)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + minNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMin + " UTC")); + long nanos = 1452062291123456789L; String expected = "2016-01-06 06:38:11.123456789"; assertThat( @@ -1050,6 +1109,17 @@ public void testFormatTimestampInputNanos() { assertThat( BigQueryAvroUtils.formatTimestamp(nanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), equalTo(expected + " UTC")); + + long maxNanos = Long.MAX_VALUE; // 9223372036854775807L + String expectedMax = "2262-04-11 23:47:16.854775807"; + assertThat( + BigQueryAvroUtils.formatDatetime( + maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMax)); + assertThat( + BigQueryAvroUtils.formatTimestamp( + maxNanos, BigQueryAvroUtils.TimestampPrecision.NANOSECONDS), + equalTo(expectedMax + " UTC")); } @Test From 0ad776f2380c2c020fb3c853ab51b2a19ed19c73 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 25 Nov 2025 17:29:47 -0500 Subject: [PATCH 4/7] Remove println --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 960027f0b67d..29f17ae8a79e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -193,7 +193,6 @@ java.time.Instant toInstant(long epochValue) { @VisibleForTesting @SuppressWarnings("JavaInstantGetSecondsGetNano") static String formatDatetime(java.time.Instant instant) { - System.out.println(instant); String dateTime = DATE_TIME_FORMATTER.format(instant); int nanos = instant.getNano(); From 0db14f1e724bca23b6db707ecffc2d51ece0f665 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 26 Nov 2025 12:03:14 -0500 Subject: [PATCH 5/7] Comments. --- .../extensions/avro/schemas/utils/AvroUtils.java | 9 +++++---- .../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 14 ++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index d2a2dc49e13c..a4aed91e60e4 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1197,9 +1197,8 @@ private static org.apache.avro.Schema getFieldSchema( baseType = LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); } else if (Timestamp.IDENTIFIER.equals(identifier)) { - baseType = - new org.apache.avro.Schema.Parser() - .parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"); + baseType = org.apache.avro.Schema.create(Type.LONG); + baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE); } else { throw new RuntimeException( "Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier()); @@ -1419,7 +1418,9 @@ private static Object convertLogicalType( if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE && org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals( fieldType.getLogicalType().getIdentifier())) { - return java.time.Instant.ofEpochSecond(0L, nanos); + long seconds = Math.floorDiv(nanos, 1_000_000_000L); + long nanoAdjustment = Math.floorMod(nanos, 1_000_000_000L); + return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment); } else { return nanos; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 29f17ae8a79e..b5243a8110b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -175,11 +175,17 @@ java.time.Instant toInstant(long epochValue) { case MILLISECONDS: return java.time.Instant.ofEpochMilli(epochValue); case MICROSECONDS: - return java.time.Instant.ofEpochSecond( - epochValue / 1_000_000L, (epochValue % 1_000_000L) * 1_000L); + { + long seconds = Math.floorDiv(epochValue, 1_000_000L); + long microsOfSecond = Math.floorMod(epochValue, 1_000_000L); + return java.time.Instant.ofEpochSecond(seconds, microsOfSecond * 1_000L); + } case NANOSECONDS: - return java.time.Instant.ofEpochSecond( - epochValue / 1_000_000_000L, epochValue % 1_000_000_000L); + { + long seconds = Math.floorDiv(epochValue, 1_000_000_000L); + long nanosOfSecond = Math.floorMod(epochValue, 1_000_000_000L); + return java.time.Instant.ofEpochSecond(seconds, nanosOfSecond); + } default: throw new IllegalStateException("Unknown precision: " + this); } From ae5dabf1f9487b145b5ca72133d5821f26cdd2b4 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Dec 2025 22:00:17 +0000 Subject: [PATCH 6/7] Comments --- .../sdk/extensions/avro/schemas/utils/AvroUtils.java | 10 ++++++++-- .../extensions/avro/schemas/utils/AvroUtilsTest.java | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java index a4aed91e60e4..882e46208a96 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java @@ -1197,6 +1197,11 @@ private static org.apache.avro.Schema getFieldSchema( baseType = LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG)); } else if (Timestamp.IDENTIFIER.equals(identifier)) { + int precision = checkNotNull(logicalType.getArgument()); + if (precision != 9) { + throw new RuntimeException( + "Timestamp logical type precision not supported:" + precision); + } baseType = org.apache.avro.Schema.create(Type.LONG); baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE); } else { @@ -1355,8 +1360,9 @@ private static org.apache.avro.Schema getFieldSchema( + TimeUnit.NANOSECONDS.toMicros(instant.getNano()); } else if (Timestamp.IDENTIFIER.equals(identifier)) { java.time.Instant instant = (java.time.Instant) value; - // Use BigInteger to work around long overflows so that minimum timestamp can be - // supported. + // Use BigInteger to work around long overflows so that epochNanos = Long.MIN_VALUE can be + // supported. Instant always stores nanos as positive adjustment so the math will silently + // overflow with regular int64. BigInteger epochSeconds = BigInteger.valueOf(instant.getEpochSecond()); BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano()); BigInteger epochNanos = diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java index d4b46e2b4484..d087ed0a20bc 100644 --- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java @@ -595,7 +595,7 @@ public void testTimestampNanosRoundTrip() { java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported - java.time.Instant.ofEpochSecond(0L, Long.MIN_VALUE), // min supported + java.time.Instant.parse("1677-09-21T00:12:43.145224192Z"), // min supported by an int64 }; org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); From ad341ca89189f840880adb5eb5c3472503a2c2c5 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 2 Dec 2025 22:03:10 +0000 Subject: [PATCH 7/7] Comments. --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java index 5a0c35eca2fe..e95e15465966 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java @@ -1017,6 +1017,7 @@ public void testConvertAvroSchemaToBigQuerySchema() { @Test public void testFormatTimestampInputMillis() { // Min: Earliest timestamp supported by BQ + // https://docs.cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type long minMillis = -62135596800000L; String expectedMin = "0001-01-01 00:00:00"; assertThat(