From 77b26ef439514b47edb1ae430d59a5e06e54b3ed Mon Sep 17 00:00:00 2001 From: chestnufang Date: Thu, 24 Jul 2025 17:40:09 +0800 Subject: [PATCH 1/2] [Fix][Connector-Jdbc]Supports reading and writing Postgres network address types --- .../psql/PostgresJdbcRowConverter.java | 17 ++++-- .../dialect/psql/PostgresTypeConverter.java | 6 ++ .../psql/PostgresTypeConverterTest.java | 35 +++++++++++ .../seatunnel/cdc/postgres/PostgresCDCIT.java | 31 ++++++++++ .../src/test/resources/ddl/inventory.sql | 32 ++++++++++ ...o_postgres_with_network_address_types.conf | 60 +++++++++++++++++++ 6 files changed, 176 insertions(+), 5 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java index b8c1d21aa93..6866e428e4c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java @@ -52,8 +52,11 @@ import java.util.Locale; import java.util.Optional; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CIDR; import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET; import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR8; public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter { @@ -188,11 +191,15 @@ public PreparedStatement toExternal( switch (seaTunnelDataType.getSqlType()) { case STRING: String sourceType = sourceTypes[fieldIndex]; - if (PG_INET.equalsIgnoreCase(sourceType)) { - PGobject inetObject = new PGobject(); - inetObject.setType(PG_INET); - inetObject.setValue(String.valueOf(row.getField(fieldIndex))); - statement.setObject(statementIndex, inetObject); + if (PG_INET.equalsIgnoreCase(sourceType) + || PG_CIDR.equalsIgnoreCase(sourceType) + || PG_MAC_ADDR.equalsIgnoreCase(sourceType) + || PG_MAC_ADDR8.equalsIgnoreCase(sourceType)) { + // handle network address types of postgres + PGobject networkTypeObject = new PGobject(); + networkTypeObject.setType(sourceType); + networkTypeObject.setValue(String.valueOf(row.getField(fieldIndex))); + statement.setObject(statementIndex, networkTypeObject); } else if (PG_INTERVAL.equalsIgnoreCase(sourceType)) { PGobject intervalObject = new PGobject(); intervalObject.setType(PG_INTERVAL); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java index 3abb5126a37..f472d3bce52 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java @@ -83,6 +83,9 @@ public class PostgresTypeConverter implements TypeConverter { // character varying <=> varchar public static final String PG_VARCHAR = "varchar"; public static final String PG_INET = "inet"; + public static final String PG_CIDR = "cidr"; + public static final String PG_MAC_ADDR = "macaddr"; + public static final String PG_MAC_ADDR8 = "macaddr8"; public static final String PG_CHARACTER_VARYING = "character varying"; // character varying[] <=> varchar[] <=> _varchar public static final String PG_VARCHAR_ARRAY = "_varchar"; @@ -228,6 +231,9 @@ public Column convert(BasicTypeDefine typeDefine) { case PG_GEOGRAPHY: case PG_INET: case PG_INTERVAL: + case PG_CIDR: + case PG_MAC_ADDR: + case PG_MAC_ADDR8: builder.dataType(BasicType.STRING_TYPE); builder.sourceType(pgDataType); break; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java index 28c0b54a561..ce48ddab644 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java @@ -827,4 +827,39 @@ public void testConvertInterval() { Assertions.assertEquals(null, column.getColumnLength()); Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); } + + @Test + public void testConvertNetworkAddressTypes() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("cidr").dataType("cidr").build(); + Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertNull(column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + BasicTypeDefine typeDefine1 = + BasicTypeDefine.builder() + .name("test1") + .columnType("macaddr") + .dataType("macaddr") + .build(); + Column column1 = PostgresTypeConverter.INSTANCE.convert(typeDefine1); + Assertions.assertEquals(typeDefine1.getName(), column1.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column1.getDataType()); + Assertions.assertNull(column1.getColumnLength()); + Assertions.assertEquals(typeDefine1.getColumnType(), column1.getSourceType()); + + BasicTypeDefine typeDefine2 = + BasicTypeDefine.builder() + .name("test2") + .columnType("macaddr8") + .dataType("macaddr8") + .build(); + Column column2 = PostgresTypeConverter.INSTANCE.convert(typeDefine2); + Assertions.assertEquals(typeDefine2.getName(), column2.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column2.getDataType()); + Assertions.assertNull(column2.getColumnLength()); + Assertions.assertEquals(typeDefine2.getColumnType(), column2.getSourceType()); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index ed290bbbdde..d6c42de2af6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -112,10 +112,12 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2"; private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3"; private static final String SOURCE_TABLE_4 = "postgres_cdc_table_4"; + private static final String SOURCE_TABLE_5 = "postgres_cdc_table_5"; private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1"; private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2"; private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3"; private static final String SINK_TABLE_4 = "sink_postgres_cdc_table_4"; + private static final String SINK_TABLE_5 = "sink_postgres_cdc_table_5"; private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key"; @@ -804,6 +806,35 @@ public void testPostgresCdcCheckDataWithIntervalDataType(TestContainer container } } + @TestTemplate + public void testPostgresCdcCheckDataWithNetworkAddressTypes(TestContainer container) { + try { + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/postgrescdc_to_postgres_with_network_address_types.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + + // stream stage + await().atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertIterableEquals( + query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_TABLE_5)), + query(getQuerySQL(POSTGRESQL_SCHEMA, SINK_TABLE_5))); + }); + } finally { + clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_5); + clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_5); + } + } + @Test public void testDialectCheckDisabledCDCTable() throws SQLException { JdbcSourceConfigFactory factory = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 095a593cb1a..68cc789b083 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -216,6 +216,32 @@ CREATE TABLE sink_postgres_cdc_table_4 PRIMARY KEY (id) ); +CREATE TABLE postgres_cdc_table_5 +( + id INTEGER NOT NULL, + f_bytea BYTEA, + f_small SMALLINT, + f_interval INTERVAL, + ip INET, + network CIDR, + mac MACADDR, + mac8 MACADDR8, + PRIMARY KEY (id) +); + +CREATE TABLE sink_postgres_cdc_table_5 +( + id INTEGER NOT NULL, + f_bytea BYTEA, + f_small SMALLINT, + f_interval INTERVAL, + ip INET, + network CIDR, + mac MACADDR, + mac8 MACADDR8, + PRIMARY KEY (id) +); + ALTER TABLE postgres_cdc_table_1 REPLICA IDENTITY FULL; @@ -228,6 +254,9 @@ ALTER TABLE postgres_cdc_table_3 ALTER TABLE postgres_cdc_table_4 REPLICA IDENTITY FULL; +ALTER TABLE postgres_cdc_table_5 + REPLICA IDENTITY FULL; + ALTER TABLE sink_postgres_cdc_table_1 REPLICA IDENTITY FULL; @@ -256,6 +285,9 @@ VALUES (1, '2', 32767, 65535); INSERT INTO postgres_cdc_table_4 VALUES (1, '2', 32767, INTERVAL '2 days 3 hours'); +INSERT INTO postgres_cdc_table_5 (id, f_bytea, f_small, f_interval, ip, network, mac, mac8) +VALUES (1, '2', 32767, INTERVAL '1 day 2 hours', '192.168.1.100', '192.168.1.0/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03:04:05') + INSERT INTO full_types_no_primary_key VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf new file mode 100644 index 00000000000..6c096bca14e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 + read_limit.bytes_per_second=7000000 + read_limit.rows_per_second=400 +} + +source { + Postgres-CDC { + plugin_output = "customers_postgres_cdc" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.postgres_cdc_table_5"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + } +} + +transform { + +} + +sink { + jdbc { + plugin_input = "customers_postgres_cdc" + url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + driver = "org.postgresql.Driver" + user = "postgres" + password = "postgres" + + generate_sink_sql = true + # You need to configure both database and table + database = postgres_cdc + table = inventory.sink_postgres_cdc_table_5 + primary_keys = ["id"] + } +} From 775d785012beee9c1d6bdd5f08ca8b77ea10e039 Mon Sep 17 00:00:00 2001 From: chestnufang Date: Thu, 24 Jul 2025 21:40:13 +0800 Subject: [PATCH 2/2] fix e2e unit case test --- .../src/test/resources/ddl/inventory.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 68cc789b083..c823ac5a3db 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -286,7 +286,7 @@ INSERT INTO postgres_cdc_table_4 VALUES (1, '2', 32767, INTERVAL '2 days 3 hours'); INSERT INTO postgres_cdc_table_5 (id, f_bytea, f_small, f_interval, ip, network, mac, mac8) -VALUES (1, '2', 32767, INTERVAL '1 day 2 hours', '192.168.1.100', '192.168.1.0/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03:04:05') +VALUES (1, '2', 32767, INTERVAL '1 day 2 hours', '192.168.1.100', '192.168.1.0/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03:04:05'); INSERT INTO full_types_no_primary_key VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,