Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add timestamp-nanos avro logical type support in bigquery utils.
  • Loading branch information
claudevdm committed Nov 25, 2025
commit 1d0a0ef8308ddd6c689ed0ce3cb0895c2d076c82
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
Expand Down Expand Up @@ -137,6 +138,7 @@
* LogicalTypes.TimestampMillis <-----> DATETIME
* LogicalTypes.TimestampMicros ------> Long
* LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1")
* LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9))
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
Expand Down Expand Up @@ -164,6 +166,8 @@ public class AvroUtils {

private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;

private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";

static {
GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData();
addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
Expand Down Expand Up @@ -1027,6 +1031,11 @@ private static FieldType toFieldType(TypeWithNullability type) {
fieldType = FieldType.DATETIME;
}
}
// TODO: Remove once Avro 1.12+ has timestamp-nanos
if (fieldType == null
&& TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) {
fieldType = FieldType.logicalType(Timestamp.NANOS);
}

if (fieldType == null) {
switch (type.type.getType()) {
Expand Down Expand Up @@ -1186,6 +1195,10 @@ private static org.apache.avro.Schema getFieldSchema(
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
baseType =
new org.apache.avro.Schema.Parser()
.parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}");
} else {
throw new RuntimeException(
"Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier());
Expand Down Expand Up @@ -1340,6 +1353,11 @@ private static org.apache.avro.Schema getFieldSchema(
java.time.Instant instant = (java.time.Instant) value;
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano());
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
java.time.Instant instant = (java.time.Instant) value;
long epochSeconds = instant.getEpochSecond();
int nanoOfSecond = instant.getNano();
return (epochSeconds * 1_000_000_000L) + nanoOfSecond;
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
Expand Down Expand Up @@ -1387,6 +1405,22 @@ private static Object convertLogicalType(
@Nonnull FieldType fieldType,
@Nonnull GenericData genericData) {
TypeWithNullability type = new TypeWithNullability(avroSchema);

// TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) {
if (type.type.getType() == Type.LONG) {
Long nanos = (Long) value;
// Check if Beam expects Timestamp logical type
if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
&& org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals(
fieldType.getLogicalType().getIdentifier())) {
return java.time.Instant.ofEpochSecond(0L, nanos);
} else {
return nanos;
}
}
}

LogicalType logicalType = LogicalTypes.fromSchema(type.type);
if (logicalType == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
Expand Down Expand Up @@ -549,6 +550,86 @@ public void testFromBeamSchema() {
assertEquals(getAvroSchema(), avroSchema);
}

@Test
public void testBeamTimestampNanosLogicalTypeToAvroSchema() {
Schema beamSchema =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();

// Expected Avro schema with timestamp-nanos
String expectedJson =
"{\"type\": \"record\", \"name\": \"topLevelRecord\", "
+ "\"fields\": [{\"name\": \"timestampNanos\", "
+ "\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}]}";

org.apache.avro.Schema expectedAvroSchema =
new org.apache.avro.Schema.Parser().parse(expectedJson);

assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema));
}

@Test
public void testBeamTimestampNanosToGenericRecord() {
Schema beamSchema =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();

java.time.Instant instant = java.time.Instant.parse("2000-01-01T01:02:03.123456789Z");
Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();

// Expected nanos since epoch
long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema);

assertEquals(expectedNanos, avroRecord.get("timestampNanos"));
}

@Test
public void testTimestampNanosRoundTrip() {
Schema beamSchema =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();

// Test various nanosecond precisions
java.time.Instant[] testInstants = {
java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano
java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos
java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos
};

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);

for (java.time.Instant instant : testInstants) {
Row originalRow = Row.withSchema(beamSchema).addValue(instant).build();
GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, avroSchema);
Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema);

assertEquals(originalRow, roundTripRow);
java.time.Instant roundTripInstant =
(java.time.Instant) roundTripRow.getValue("timestampNanos");
assertEquals(instant, roundTripInstant);
}
}

@Test
public void testTimestampNanosAvroSchemaToBeamSchema() {
List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
fields.add(
new org.apache.avro.Schema.Field(
"timestampNanos",
new org.apache.avro.Schema.Parser()
.parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"),
"",
(Object) null));
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("test", null, null, false, fields);

Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);

Schema expected =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
assertEquals(expected, beamSchema);
}

@Test
public void testAvroSchemaFromBeamSchemaCanBeParsed() {
org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class BigQueryAvroUtils {
Optional.ofNullable(Schema.class.getPackage())
.map(Package::getImplementationVersion)
.orElse("");

private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
Expand Down Expand Up @@ -164,33 +164,76 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();

@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
String dateTime = formatDatetime(timestampMicro);
return dateTime + " UTC";
/**
* Enum to define the precision of a timestamp since the epoch. It provides methods to normalize
* any precision to seconds and nanoseconds.
*/
enum TimestampPrecision {
MILLISECONDS(1_000L, 1_000_000L),
MICROSECONDS(1_000_000L, 1_000L),
NANOSECONDS(1_000_000_000L, 1L);

private final long divisorForSeconds;
private final long nanoMultiplier;

TimestampPrecision(long divisorForSeconds, long nanoMultiplier) {
this.divisorForSeconds = divisorForSeconds;
this.nanoMultiplier = nanoMultiplier;
}

public long getDivisorForSeconds() {
return divisorForSeconds;
}

public long toNanos(long fractionalPart) {
return fractionalPart * this.nanoMultiplier;
}

public String formatFractional(long nanoOfSecond) {
if (nanoOfSecond % 1_000_000 == 0) {
return String.format(".%03d", nanoOfSecond / 1_000_000);
} else if (nanoOfSecond % 1000 == 0) {
return String.format(".%06d", nanoOfSecond / 1000);
} else {
return String.format(".%09d", nanoOfSecond);
}
}
}

/**
* Formats a timestamp value with specified precision.
*
* @param timestamp The timestamp value in units specified by precision (milliseconds,
* microseconds, or nanoseconds since epoch)
* @param precision The precision of the input timestamp
* @return Formatted string in "yyyy-MM-dd HH:mm:ss[.fraction]" format
*/
@VisibleForTesting
static String formatDatetime(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
long timestampSec = timestampMicro / 1_000_000;
long micros = timestampMicro % 1_000_000;
if (micros < 0) {
micros += 1_000_000;
static String formatDatetime(long timestamp, TimestampPrecision precision) {
long divisor = precision.getDivisorForSeconds();
long timestampSec = timestamp / divisor;
long fractionalPart = timestamp % divisor;

if (fractionalPart < 0) {
fractionalPart += divisor;
timestampSec -= 1;
}

String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
if (micros == 0) {

long nanoOfSecond = precision.toNanos(fractionalPart);

if (nanoOfSecond == 0) {
return dayAndTime;
} else if (micros % 1000 == 0) {
return String.format("%s.%03d", dayAndTime, micros / 1000);
} else {
return String.format("%s.%06d", dayAndTime, micros);
return dayAndTime + precision.formatFractional(nanoOfSecond);
}
}

static String formatTimestamp(long timestamp, TimestampPrecision precision) {
return formatDatetime(timestamp, precision) + " UTC";
}

/**
* This method formats a BigQuery DATE value into a String matching the format used by JSON
* export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic
Expand Down Expand Up @@ -335,7 +378,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", name);

Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
switch (type) {
Expand Down Expand Up @@ -364,21 +406,26 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// Write only: SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v * 1000L);
return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v);
return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS);
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
} else if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) {
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime(((Long) v) * 1000);
return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime((Long) v);
return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS);
} else {
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
Expand Down Expand Up @@ -602,6 +649,10 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
return fieldSchema.setType("INTEGER");
}
case LONG:
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
if (useAvroLogicalTypes && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
return fieldSchema.setType("TIMESTAMP");
}
if (logicalType instanceof LogicalTypes.TimeMicros) {
return fieldSchema.setType("TIME");
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
Expand Down
Loading
Loading