Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/PostgreSql.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ semantics (using XA transaction guarantee).
| Datasource | Supported Versions | Driver | Url | Maven |
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | Different dependency version has different driver class. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
| PostgreSQL | If you want to manipulate the GEOMETRY/GEOGRAPHY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |

## Database Dependency

Expand Down Expand Up @@ -275,4 +275,4 @@ sink {

## Changelog

<ChangeLog />
<ChangeLog />
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/PostgreSQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describes how to set up the Postgre CDC connector to run SQL queries against Pos
| Datasource | Supported versions | Driver | Url | Maven |
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | Different dependency version has different driver class. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
| PostgreSQL | If you want to manipulate the GEOMETRY/GEOGRAPHY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |

## Using Dependency

Expand Down
4 changes: 2 additions & 2 deletions docs/zh/connector-v2/source/PostgreSQL-CDC.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Postgre CDC 连接器允许从 Postgre 数据库读取快照数据和增量数
| 数据源 | 支持的版本 | 驱动 | Url | Maven |
|------------|-----------------------------------------------------|---------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | 不同的依赖版本有不同的驱动类。 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [下载](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY 类型。 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY/GEOGRAPHY 类型。 | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | [下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |

## 使用依赖

Expand Down Expand Up @@ -190,4 +190,4 @@ source {

## 变更日志

<ChangeLog />
<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
Expand Down Expand Up @@ -458,11 +460,38 @@ private static DebeziumDeserializationConverter convertToString() {

@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj == null) {
return null;
}

if (schema != null && schema.name() != null && dbzObj instanceof Struct) {
String logicalName = schema.name();
if (Geometry.LOGICAL_NAME.equals(logicalName)
|| Geography.LOGICAL_NAME.equals(logicalName)) {
return convertGeometryStructToHexWkb((Struct) dbzObj);
}
}

return dbzObj.toString();
}
};
}

private static String convertGeometryStructToHexWkb(Struct struct) {
Object wkbField = struct.get(Geometry.WKB_FIELD);
if (!(wkbField instanceof byte[])) {
// Fallback to default string representation if the expected field is not present.
return struct.toString();
}

byte[] wkb = (byte[]) wkbField;
StringBuilder sb = new StringBuilder(wkb.length * 2);
for (byte b : wkb) {
sb.append(String.format("%02X", b));
}
return sb.toString();
}

private static DebeziumDeserializationConverter convertToBinary() {
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.debezium.data.geometry.Geography;
import io.debezium.data.geometry.Geometry;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -127,4 +130,80 @@ void testArrayConverter() throws Exception {
Arrays.equals(
doubles, (Double[]) (converter.convert(Arrays.asList(doubles), null))));
}

@Test
void testGeometryStringConversion() throws Exception {
SeaTunnelRowDebeziumDeserializationConverters converters =
new SeaTunnelRowDebeziumDeserializationConverters(
new SeaTunnelRowType(
new String[] {"geo"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}),
new MetadataConverter[] {},
ZoneId.systemDefault(),
DebeziumDeserializationConverterFactory.DEFAULT);

byte[] wkb = new byte[] {0x01, 0x02, (byte) 0xFF};
Schema geometrySchema = Geometry.builder().optional().build();
Schema recordSchema = SchemaBuilder.struct().field("geo", geometrySchema).build();

Struct geometryValue = Geometry.createValue(geometrySchema, wkb, 4549);
Struct recordValue = new Struct(recordSchema);
recordValue.put("geo", geometryValue);

SourceRecord record =
new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"topicName",
null,
SchemaBuilder.int32().build(),
1,
recordSchema,
recordValue,
null,
new ArrayList<>());

SeaTunnelRow row = converters.convert(record, recordValue, recordSchema);
Object fieldValue = row.getField(0);
Assertions.assertTrue(fieldValue instanceof String);
Assertions.assertEquals("0102FF", fieldValue);
}

@Test
void testGeographyStringConversion() throws Exception {
SeaTunnelRowDebeziumDeserializationConverters converters =
new SeaTunnelRowDebeziumDeserializationConverters(
new SeaTunnelRowType(
new String[] {"geo"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}),
new MetadataConverter[] {},
ZoneId.systemDefault(),
DebeziumDeserializationConverterFactory.DEFAULT);

byte[] wkb = new byte[] {0x01, 0x02, (byte) 0xFF};
Schema geographySchema = Geography.builder().optional().build();
Schema recordSchema = SchemaBuilder.struct().field("geo", geographySchema).build();

Struct geographyValue = Geometry.createValue(geographySchema, wkb, 4549);
Struct recordValue = new Struct(recordSchema);
recordValue.put("geo", geographyValue);

SourceRecord record =
new SourceRecord(
new HashMap<>(),
new HashMap<>(),
"topicName",
null,
SchemaBuilder.int32().build(),
1,
recordSchema,
recordValue,
null,
new ArrayList<>());

SeaTunnelRow row = converters.convert(record, recordValue, recordSchema);
Object fieldValue = row.getField(0);
Assertions.assertTrue(fieldValue instanceof String);
Assertions.assertEquals("0102FF", fieldValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CIDR;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_GEOGRAPHY;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_GEOMETRY;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
Expand All @@ -66,9 +68,6 @@
@Slf4j
public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {

private static final String PG_GEOMETRY = "GEOMETRY";
private static final String PG_GEOGRAPHY = "GEOGRAPHY";

@Override
public String converterName() {
return DatabaseIdentifier.POSTGRESQL;
Expand Down Expand Up @@ -108,8 +107,8 @@ public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) throws SQL
rs.getMetaData().getColumnTypeName(resultSetIndex).toUpperCase(Locale.ROOT);
switch (seaTunnelDataType.getSqlType()) {
case STRING:
if (metaDataColumnType.equals(PG_GEOMETRY)
|| metaDataColumnType.equals(PG_GEOGRAPHY)) {
if (PG_GEOMETRY.equalsIgnoreCase(metaDataColumnType)
|| PG_GEOGRAPHY.equalsIgnoreCase(metaDataColumnType)) {
Object geoObj = rs.getObject(resultSetIndex);
fields[fieldIndex] = geoObj == null ? null : geoObj.toString();
} else {
Expand Down Expand Up @@ -221,8 +220,18 @@ public PreparedStatement toExternal(

switch (seaTunnelDataType.getSqlType()) {
case STRING:
String sourceType = sourceTypes[fieldIndex];
if (PG_INET.equalsIgnoreCase(sourceType)
String sourceType =
resolveSourceType(
rowType, fieldIndex, databaseTableSchema, sourceTypes);
if (sourceType != null
&& (PG_GEOMETRY.equalsIgnoreCase(sourceType)
|| PG_GEOGRAPHY.equalsIgnoreCase(sourceType))) {
// handle PostGIS geometry/geography when represented as string
PGobject geometryObject = new PGobject();
geometryObject.setType(sourceType.toLowerCase(Locale.ROOT));
geometryObject.setValue((String) row.getField(fieldIndex));
statement.setObject(statementIndex, geometryObject);
} else if (PG_INET.equalsIgnoreCase(sourceType)
|| PG_CIDR.equalsIgnoreCase(sourceType)
|| PG_MAC_ADDR.equalsIgnoreCase(sourceType)
|| PG_MAC_ADDR8.equalsIgnoreCase(sourceType)) {
Expand Down Expand Up @@ -290,7 +299,8 @@ public PreparedStatement toExternal(
statement,
seaTunnelDataType,
statementIndex,
sourceTypes[fieldIndex]);
resolveSourceType(
rowType, fieldIndex, databaseTableSchema, sourceTypes));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex));
Expand Down Expand Up @@ -333,6 +343,23 @@ public PreparedStatement toExternal(
return statement;
}

@Nullable private String resolveSourceType(
SeaTunnelRowType rowType,
int fieldIndex,
@Nullable TableSchema databaseTableSchema,
String[] sourceTypes) {
if (databaseTableSchema != null) {
String fieldName = rowType.getFieldName(fieldIndex);
if (databaseTableSchema.contains(fieldName)) {
return databaseTableSchema.getColumn(fieldName).getSourceType();
}
}
if (fieldIndex < sourceTypes.length) {
return sourceTypes[fieldIndex];
}
return null;
}

public String microsecondsToIntervalFormatVal(String intervalVal) {
Duration duration = Duration.ofNanos(Long.parseLong(intervalVal) * 1000);
int days = (int) duration.toDays();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public class PostgresTypeConverter implements TypeConverter<BasicTypeDefine> {
public static final String PG_JSONB = "jsonb";
public static final String PG_XML = "xml";
public static final String PG_UUID = "uuid";
private static final String PG_GEOMETRY = "geometry";
private static final String PG_GEOGRAPHY = "geography";
public static final String PG_GEOMETRY = "geometry";
public static final String PG_GEOGRAPHY = "geography";
public static final String PG_DATE = "date";
public static final String PG_INTERVAL = "interval";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.postgresql.util.PGobject;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
Expand All @@ -38,7 +40,9 @@
import java.util.ArrayList;
import java.util.List;

import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class PostgresJdbcRowConverterTest {
Expand Down Expand Up @@ -185,4 +189,95 @@ public void testToInternalWithNullGeometryType() throws SQLException {
Assertions.assertEquals(1, row.getField(0));
Assertions.assertNull(row.getField(1), "geometry_col should be null");
}

@Test
public void testToExternalWithGeometryType() throws SQLException {
TableSchema tableSchema =
createTableSchema("geometry_col", BasicType.STRING_TYPE, "geometry");

SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
PreparedStatement statement = mock(PreparedStatement.class);

converter.toExternal(tableSchema, null, row, statement);

ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
verify(statement).setObject(eq(2), captor.capture());

Object arg = captor.getValue();
Assertions.assertTrue(arg instanceof PGobject);
PGobject pg = (PGobject) arg;
Assertions.assertEquals("geometry", pg.getType());
Assertions.assertEquals("0102FF", pg.getValue());
}

@Test
public void testToExternalWithGeometryTypeFromDatabaseSchema() throws SQLException {
TableSchema writeSchema = createTableSchema("geometry_col", BasicType.STRING_TYPE, null);
TableSchema databaseSchema =
createTableSchema("geometry_col", BasicType.STRING_TYPE, "geometry");

SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
PreparedStatement statement = mock(PreparedStatement.class);

converter.toExternal(writeSchema, databaseSchema, row, statement);

ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
verify(statement).setObject(eq(2), captor.capture());

Object arg = captor.getValue();
Assertions.assertTrue(arg instanceof PGobject);
PGobject pg = (PGobject) arg;
Assertions.assertEquals("geometry", pg.getType());
Assertions.assertEquals("0102FF", pg.getValue());
}

@Test
public void testToInternalWithGeographyType() throws SQLException {
ResultSet rs = mock(ResultSet.class);
TableSchema tableSchema =
createTableSchema("geography_col", BasicType.STRING_TYPE, "GEOGRAPHY");

setupMockResultSet(rs, "INT4", "GEOGRAPHY", 1, "POINT(1 2)");

SeaTunnelRow row = converter.toInternal(rs, tableSchema);

Assertions.assertNotNull(row);
Assertions.assertEquals(1, row.getField(0));
Assertions.assertEquals("POINT(1 2)", row.getField(1));
}

@Test
public void testToInternalWithNullGeographyType() throws SQLException {
ResultSet rs = mock(ResultSet.class);
TableSchema tableSchema =
createTableSchema("geography_col", BasicType.STRING_TYPE, "GEOGRAPHY");

setupMockResultSet(rs, "INT4", "GEOGRAPHY", 1, null);

SeaTunnelRow row = converter.toInternal(rs, tableSchema);

Assertions.assertNotNull(row);
Assertions.assertEquals(1, row.getField(0));
Assertions.assertNull(row.getField(1), "geography_col should be null");
}

@Test
public void testToExternalWithGeographyType() throws SQLException {
TableSchema tableSchema =
createTableSchema("geography_col", BasicType.STRING_TYPE, "geography");

SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
PreparedStatement statement = mock(PreparedStatement.class);

converter.toExternal(tableSchema, null, row, statement);

ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
verify(statement).setObject(eq(2), captor.capture());

Object arg = captor.getValue();
Assertions.assertTrue(arg instanceof PGobject);
PGobject pg = (PGobject) arg;
Assertions.assertEquals("geography", pg.getType());
Assertions.assertEquals("0102FF", pg.getValue());
}
}
Loading