Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkFactory;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.sink.SinkFlowTestUtils;
import org.apache.seatunnel.connectors.seatunnel.source.SourceFlowTestUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
Expand Down Expand Up @@ -2013,6 +2018,227 @@ private SeaTunnelRow buildSeaTunnelRow() {
return seaTunnelRow;
}

@TestTemplate
public void testProtobufCaseSensitiveFieldNames(TestContainer container) throws Exception {
SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType();

Map<String, String> schemaProperties = new HashMap<>();
schemaProperties.put("protobuf_message_name", "TestMessage");
schemaProperties.put(
"protobuf_schema",
"syntax = \"proto3\";\n"
+ "package org.apache.seatunnel.format.protobuf;\n"
+ "message TestMessage {\n"
+ " int32 MyIntField = 1;\n"
+ " string CamelCaseString = 2;\n"
+ " string snake_case_field = 3;\n"
+ " message NestedObject {\n"
+ " string NestedField = 1;\n"
+ " int32 AnotherField = 2;\n"
+ " }\n"
+ " NestedObject nestedObject = 4;\n"
+ " map<string, int32> MyMapField = 5;\n"
+ "}");

TableSchema schema =
TableSchema.builder()
.columns(
Arrays.asList(
IntStream.range(0, seaTunnelRowType.getTotalFields())
.mapToObj(
i ->
PhysicalColumn.of(
seaTunnelRowType
.getFieldName(i),
seaTunnelRowType
.getFieldType(i),
0,
true,
null,
null))
.toArray(PhysicalColumn[]::new)))
.build();

CatalogTable catalogTable =
CatalogTable.of(
TableIdentifier.of("", "", "", "test"),
schema,
schemaProperties,
Collections.emptyList(),
"Protobuf case-sensitive test");

Map<String, Object> config = new HashMap<>();
config.put(KafkaBaseOptions.TOPIC.key(), "test_protobuf_case_sensitive_topic");
config.put(KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers());
config.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF);
config.put("protobuf_message_name", "TestMessage");
config.put(
"protobuf_schema",
"syntax = \"proto3\";\n"
+ "package org.apache.seatunnel.format.protobuf;\n"
+ "message TestMessage {\n"
+ " int32 MyIntField = 1;\n"
+ " string CamelCaseString = 2;\n"
+ " string snake_case_field = 3;\n"
+ " message NestedObject {\n"
+ " string NestedField = 1;\n"
+ " int32 AnotherField = 2;\n"
+ " }\n"
+ " NestedObject nestedObject = 4;\n"
+ " map<string, int32> MyMapField = 5;\n"
+ "}");

List<SeaTunnelRow> rows = createCaseSensitiveTestRows();

// Use SinkFlowTestUtils to write data to Kafka
SinkFlowTestUtils.runBatchWithCheckpointDisabled(
catalogTable, ReadonlyConfig.fromMap(config), new KafkaSinkFactory(), rows);

// Verify data from Kafka
ProtobufDeserializationSchema deserializationSchema =
new ProtobufDeserializationSchema(catalogTable);

List<SeaTunnelRow> kafkaSTRow =
getKafkaSTRow(
"test_protobuf_case_sensitive_topic",
value -> {
try {
return deserializationSchema.deserialize(value);
} catch (IOException e) {
throw new RuntimeException("Error deserializing Kafka message", e);
}
});

Assertions.assertEquals(2, kafkaSTRow.size());

kafkaSTRow.forEach(
row -> {
Assertions.assertAll(
"Verify case-sensitive field values",
() -> Assertions.assertNotNull(row.getField(0)), // MyIntField
() -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString
() -> Assertions.assertNotNull(row.getField(2)), // snake_case_field
() -> {
SeaTunnelRow nestedRow = (SeaTunnelRow) row.getField(3);
if (nestedRow != null) {
Assertions.assertNotNull(nestedRow.getField(0)); // NestedField
Assertions.assertNotNull(nestedRow.getField(1)); // AnotherField
}
},
() -> {
@SuppressWarnings("unchecked")
Map<String, Integer> mapField =
(Map<String, Integer>) row.getField(4);
if (mapField != null) {
Assertions.assertNotNull(mapField);
}
});
});
}

@TestTemplate
public void testProtobufCaseSensitiveToAssert(TestContainer container) throws Exception {
SeaTunnelRowType seaTunnelRowType = buildCaseSensitiveSeaTunnelRowType();

// Write test data to Kafka first
String confFile = "/protobuf/kafka_protobuf_case_sensitive_to_assert.conf";
String path = getTestConfigFile(confFile);
Config config = ConfigFactory.parseFile(new File(path));
Config sourceConfig = config.getConfigList("source").get(0);
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sourceConfig);

DefaultSeaTunnelRowSerializer serializer =
getDefaultSeaTunnelRowSerializer(
"test_protobuf_case_sensitive_topic", seaTunnelRowType, readonlyConfig);

List<SeaTunnelRow> testRows = createCaseSensitiveTestRows();

for (SeaTunnelRow row : testRows) {
ProducerRecord<byte[], byte[]> producerRecord = serializer.serializeRow(row);
try {
producer.send(producerRecord).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error sending Kafka message", e);
}
}
producer.flush();

// Use SourceFlowTestUtils to read data from Kafka
Map<String, Object> sourceOptions = new HashMap<>();
sourceOptions.put(KafkaBaseOptions.TOPIC.key(), "test_protobuf_case_sensitive_topic");
sourceOptions.put(
KafkaBaseOptions.BOOTSTRAP_SERVERS.key(), kafkaContainer.getBootstrapServers());
sourceOptions.put(KafkaBaseOptions.FORMAT.key(), MessageFormat.PROTOBUF);
sourceOptions.put(
"protobuf_message_name",
readonlyConfig.get(KafkaBaseOptions.PROTOBUF_MESSAGE_NAME));
sourceOptions.put("protobuf_schema", readonlyConfig.get(KafkaBaseOptions.PROTOBUF_SCHEMA));

List<SeaTunnelRow> readRows =
SourceFlowTestUtils.runBatchWithCheckpointDisabled(
ReadonlyConfig.fromMap(sourceOptions), new KafkaSourceFactory());

Assertions.assertEquals(2, readRows.size());

readRows.forEach(
row -> {
Assertions.assertAll(
"Verify case-sensitive field values from source",
() -> Assertions.assertNotNull(row.getField(0)), // MyIntField
() -> Assertions.assertNotNull(row.getField(1)), // CamelCaseString
() -> Assertions.assertNotNull(row.getField(2))); // snake_case_field
});
}

private SeaTunnelRowType buildCaseSensitiveSeaTunnelRowType() {
SeaTunnelRowType nestedType =
new SeaTunnelRowType(
new String[] {"NestedField", "AnotherField"},
new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE, BasicType.INT_TYPE});

return new SeaTunnelRowType(
new String[] {
"MyIntField",
"CamelCaseString",
"snake_case_field",
"NestedObject",
"MyMapField"
},
new SeaTunnelDataType<?>[] {
BasicType.INT_TYPE,
BasicType.STRING_TYPE,
BasicType.STRING_TYPE,
nestedType,
new MapType<>(BasicType.STRING_TYPE, BasicType.INT_TYPE)
});
}

private List<SeaTunnelRow> createCaseSensitiveTestRows() {
SeaTunnelRow nestedRow = new SeaTunnelRow(2);
nestedRow.setField(0, "nested_value");
nestedRow.setField(1, 999);

Map<String, Integer> mapData = new HashMap<>();
mapData.put("key1", 100);
mapData.put("key2", 200);

SeaTunnelRow row1 = new SeaTunnelRow(5);
row1.setField(0, 1);
row1.setField(1, "test_string_1");
row1.setField(2, "snake_value_1");
row1.setField(3, nestedRow);
row1.setField(4, mapData);

SeaTunnelRow row2 = new SeaTunnelRow(5);
row2.setField(0, 2);
row2.setField(1, "test_string_2");
row2.setField(2, "snake_value_2");
row2.setField(3, nestedRow);
row2.setField(4, mapData);

return Arrays.asList(row1, row2);
}

private SeaTunnelRowType buildSeaTunnelRowType() {
SeaTunnelRowType addressType =
new SeaTunnelRowType(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# spark config
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local

}
source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
MyIntField = int
CamelCaseString = string
snake_case_field = string

NestedObject {
NestedField = string
AnotherField = int
}
MyMapField = "map<string,int>"
}
}
}
}

sink {
kafka {
topic = "test_protobuf_case_sensitive_topic"
bootstrap.servers = "kafkaCluster:9092"
format = protobuf
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
protobuf_message_name = TestCaseSensitive
protobuf_schema = """
syntax = "proto3";

package org.apache.seatunnel.format.protobuf;

option java_outer_classname = "ProtobufCaseSensitiveE2E";

message TestCaseSensitive {
int32 MyIntField = 1;
string CamelCaseString = 2;
string snake_case_field = 3;

message NestedObject {
string NestedField = 1;
int32 AnotherField = 2;
}

NestedObject nestedObject = 4;

map<string, int32> MyMapField = 5;
}
"""
}
}

Loading