Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object SchemaConverters {
catalystType: DataType,
nullable: Boolean = false,
recordName: String = "topLevelRecord",
prevNameSpace: String = "")
nameSpace: String = "")
: Schema = {
val builder = SchemaBuilder.builder()

Expand All @@ -143,29 +143,25 @@ object SchemaConverters {
val avroType = LogicalTypes.decimal(d.precision, d.scale)
val fixedSize = minBytesForPrecision(d.precision)
// Need to avoid naming conflict for the fixed fields
val name = prevNameSpace match {
val name = nameSpace match {
case "" => s"$recordName.fixed"
case _ => s"$prevNameSpace.$recordName.fixed"
case _ => s"$nameSpace.$recordName.fixed"
}
avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize))

case BinaryType => builder.bytesType()
case ArrayType(et, containsNull) =>
builder.array()
.items(toAvroType(et, containsNull, recordName, prevNameSpace))
.items(toAvroType(et, containsNull, recordName, nameSpace))
case MapType(StringType, vt, valueContainsNull) =>
builder.map()
.values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace))
.values(toAvroType(vt, valueContainsNull, recordName, nameSpace))
case st: StructType =>
val nameSpace = prevNameSpace match {
case "" => recordName
case _ => s"$prevNameSpace.$recordName"
}

val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName
val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this line is the only difference for the whole code change. The namespace here should not be the one with recordName at the end.

st.foreach { f =>
val fieldAvroType =
toAvroType(f.dataType, f.nullable, f.name, nameSpace)
toAvroType(f.dataType, f.nullable, f.name, childNameSpace)
fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
}
fieldsAssembler.endRecord()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,6 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val schema = getAvroSchemaStringFromFiles(dir.toString)
assert(schema.contains("\"namespace\":\"topLevelRecord\""))
assert(schema.contains("\"namespace\":\"topLevelRecord.data\""))
assert(schema.contains("\"namespace\":\"topLevelRecord.data.data\""))
}
}

Expand All @@ -1099,6 +1098,47 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}

test("check namespace - toAvroType") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arunmahadevan, can we add a simple end-to-end test as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its sort of covered in the below existing cases. Do you think we need more?

Validate namespace in avro file that has nested records with the same name
conversion to avro and back with namespace

val sparkSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("address", StructType(Seq(
StructField("city", StringType, nullable = false),
StructField("state", StringType, nullable = false))),
nullable = false)))
val employeeType = SchemaConverters.toAvroType(sparkSchema,
recordName = "employee",
nameSpace = "foo.bar")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you also add a case for nameSpace as "" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case for toAvroType with empty namespace


assert(employeeType.getFullName == "foo.bar.employee")
assert(employeeType.getName == "employee")
assert(employeeType.getNamespace == "foo.bar")

val addressType = employeeType.getField("address").schema()
assert(addressType.getFullName == "foo.bar.employee.address")
assert(addressType.getName == "address")
assert(addressType.getNamespace == "foo.bar.employee")
}

test("check empty namespace - toAvroType") {
val sparkSchema = StructType(Seq(
StructField("name", StringType, nullable = false),
StructField("address", StructType(Seq(
StructField("city", StringType, nullable = false),
StructField("state", StringType, nullable = false))),
nullable = false)))
val employeeType = SchemaConverters.toAvroType(sparkSchema,
recordName = "employee")

assert(employeeType.getFullName == "employee")
assert(employeeType.getName == "employee")
assert(employeeType.getNamespace == null)

val addressType = employeeType.getField("address").schema()
assert(addressType.getFullName == "employee.address")
assert(addressType.getName == "address")
assert(addressType.getNamespace == "employee")
}

case class NestedMiddleArray(id: Int, data: Array[NestedBottom])

case class NestedTopArray(id: Int, data: NestedMiddleArray)
Expand Down