Skip to content

Commit a527556

Browse files
committed
add feature to be able to configure filter document for replace one model
introduced a write model filter strategy to be able to support different use cases. besides the default option there is now a new filter strategy which allows to work with unique business keys and still have e.g. a mongodb ObjectId inserted once during the first upsert per document. this strategy only leads to a correct behaviour in case the business key fields (expressed by means of the PartialValueStrategy for the _id field) are guaranteed to be unique in the original kafka records. see config option mongodb.replace.one.strategy with two pre-defined strategy options - at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy - at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy
1 parent cf9ffb8 commit a527556

File tree

8 files changed

+162
-35
lines changed

8 files changed

+162
-35
lines changed

README.md

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -404,23 +404,24 @@ Data is written using acknowledged writes and the configured write concern level
404404

405405
At the moment the following settings can be configured by means of the *connector.properties* file. For a config file containing default settings see [this example](https://github.com/hpgrahsl/kafka-connect-mongodb/blob/master/config/MongoDbSinkConnector.properties).
406406

407-
| Name | Description | Type | Default | Valid Values | Importance |
408-
|-------------------------------------|----------------------------------------------------------------------------------------|---------|-----------------------------------------------------------------------|------------------------------|------------|
409-
| mongodb.collection | single sink collection name to write to | string | kafkatopic | | high |
410-
| mongodb.connection.uri | the mongodb connection URI as supported by the official drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true | | high |
411-
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | | high |
412-
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false | | medium |
413-
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 | [0,...] | medium |
414-
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 | [0,...] | medium |
415-
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | | low |
416-
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | | low |
417-
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string | [] | | low |
418-
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string | [] | | low |
419-
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" | | low |
420-
| mongodb.key.projection.type | whether or not and which key projection to use | string | none | [none, blacklist, whitelist] | low |
421-
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | | low |
422-
| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" | | low |
423-
| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
407+
| Name | Description | Type | Default | Valid Values | Importance |
408+
|-------------------------------------|----------------------------------------------------------------------------------------|---------|--------------------------------------------------------------------------------------------|------------------------------|------------|
409+
| mongodb.collection | single sink collection name to write to | string | kafkatopic | | high |
410+
| mongodb.connection.uri | the monogdb connection URI as supported by the offical drivers | string | mongodb://localhost:27017/kafkaconnect?w=1&journal=true | | high |
411+
| mongodb.document.id.strategy | class name of strategy to use for generating a unique document id (_id) | string | at.grahsl.kafka.connect.mongodb.processor.id.strategy.BsonOidStrategy | | high |
412+
| mongodb.delete.on.null.values | whether or not the connector tries to delete documents based on key when value is null | boolean | false | | medium |
413+
| mongodb.max.num.retries | how often a retry should be done on write errors | int | 3 | [0,...] | medium |
414+
| mongodb.retries.defer.timeout | how long in ms a retry should get deferred | int | 5000 | [0,...] | medium |
415+
| mongodb.change.data.capture.handler | class name of CDC handler to use for processing | string | "" | | low |
416+
| mongodb.document.id.strategies | comma separated list of custom strategy classes to register for usage | string | "" | | low |
417+
| mongodb.field.renamer.mapping | inline JSON array with objects describing field name mappings (see docs) | string | [] | | low |
418+
| mongodb.field.renamer.regexp | inline JSON array with objects describing regexp settings (see docs) | string | [] | | low |
419+
| mongodb.key.projection.list | comma separated list of field names for key projection | string | "" | | low |
420+
| mongodb.key.projection.type | whether or not and which key projection to use | string | none | [none, blacklist, whitelist] | low |
421+
| mongodb.post.processor.chain | comma separated list of post processor classes to build the chain with | string | at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder | | low |
422+
| mongodb.replace.one.strategy | how to build the filter doc for the replaceOne write model | string | at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy | | low |
423+
| mongodb.value.projection.list | comma separated list of field names for value projection | string | "" | | low |
424+
| mongodb.value.projection.type | whether or not and which value projection to use | string | none | [none, blacklist, whitelist] | low |
424425

425426
### Running in development
426427

config/MongoDbSinkConnector.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ mongodb.field.renamer.regexp=[]
4242
mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder
4343
mongodb.change.data.capture.handler=
4444
mongodb.delete.on.null.values=false
45+
mongodb.replace.one.strategy=at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import at.grahsl.kafka.connect.mongodb.processor.field.renaming.RegExpSettings;
2727
import at.grahsl.kafka.connect.mongodb.processor.field.renaming.RenameByRegExp;
2828
import at.grahsl.kafka.connect.mongodb.processor.id.strategy.*;
29+
import at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.WriteModelFilterStrategy;
2930
import com.fasterxml.jackson.core.type.TypeReference;
3031
import com.fasterxml.jackson.databind.ObjectMapper;
3132
import com.mongodb.MongoClientURI;
@@ -64,6 +65,7 @@ public enum FieldProjectionTypes {
6465
public static final String MONGODB_POST_PROCESSOR_CHAIN_DEFAULT = "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder";
6566
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT = "";
6667
public static final boolean MONGODB_DELETE_ON_NULL_VALUES_DEFAULT = false;
68+
public static final String MONGODB_REPLACE_ONE_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneDefaultFilterStrategy";
6769

6870
public static final String MONGODB_CONNECTION_URI_CONF = "mongodb.connection.uri";
6971
private static final String MONGODB_CONNECTION_URI_DOC = "the monogdb connection URI as supported by the offical drivers";
@@ -110,6 +112,9 @@ public enum FieldProjectionTypes {
110112
public static final String MONGODB_DELETE_ON_NULL_VALUES = "mongodb.delete.on.null.values";
111113
private static final String MONGODB_DELETE_ON_NULL_VALUES_DOC = "whether or not the connector tries to delete documents based on key when value is null";
112114

115+
public static final String MONGODB_REPLACE_ONE_STRATEGY = "mongodb.replace.one.strategy";
116+
private static final String MONGODB_REPLACE_ONE_STRATEGY_DOC = "how to build the filter doc for the replaceOne write model";
117+
113118
private static ObjectMapper objectMapper = new ObjectMapper();
114119

115120
public MongoDbSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
@@ -137,6 +142,7 @@ public static ConfigDef conf() {
137142
.define(MONGODB_POST_PROCESSOR_CHAIN, Type.STRING, MONGODB_POST_PROCESSOR_CHAIN_DEFAULT, Importance.LOW, MONGODB_POST_PROCESSOR_CHAIN_DOC)
138143
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC)
139144
.define(MONGODB_DELETE_ON_NULL_VALUES, Type.BOOLEAN, MONGODB_DELETE_ON_NULL_VALUES_DEFAULT, Importance.MEDIUM, MONGODB_DELETE_ON_NULL_VALUES_DOC)
145+
.define(MONGODB_REPLACE_ONE_STRATEGY, Type.STRING, MONGODB_REPLACE_ONE_STRATEGY_DEFAULT, Importance.LOW, MONGODB_REPLACE_ONE_STRATEGY_DOC)
140146
;
141147
}
142148

@@ -310,6 +316,20 @@ public boolean isDeleteOnNullValues() {
310316
return getBoolean(MONGODB_DELETE_ON_NULL_VALUES);
311317
}
312318

319+
public WriteModelFilterStrategy getReplaceOneFilterStrategy() {
320+
String replaceOneFilterStrategy = getString(MONGODB_REPLACE_ONE_STRATEGY);
321+
try {
322+
return (WriteModelFilterStrategy) Class.forName(replaceOneFilterStrategy)
323+
.getConstructor().newInstance();
324+
} catch (ReflectiveOperationException e) {
325+
throw new ConfigException(e.getMessage(),e);
326+
} catch (ClassCastException e) {
327+
throw new ConfigException("error: specified class "+ replaceOneFilterStrategy
328+
+ " violates the contract since it doesn't implement " +
329+
WriteModelFilterStrategy.class);
330+
}
331+
}
332+
313333
public static Set<String> getPredefinedCdcHandlerClassNames() {
314334
Set<String> cdcHandlers = new HashSet<String>();
315335
cdcHandlers.add(MongoDbHandler.class.getName());

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkTask.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,17 @@
2020
import at.grahsl.kafka.connect.mongodb.converter.SinkConverter;
2121
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
2222
import at.grahsl.kafka.connect.mongodb.processor.PostProcessor;
23-
import com.mongodb.*;
23+
import at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.DeleteOneDefaultFilterStrategy;
24+
import at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.WriteModelFilterStrategy;
25+
import com.mongodb.BulkWriteException;
26+
import com.mongodb.MongoClient;
27+
import com.mongodb.MongoClientURI;
28+
import com.mongodb.MongoException;
2429
import com.mongodb.bulk.BulkWriteResult;
2530
import com.mongodb.client.MongoCollection;
2631
import com.mongodb.client.MongoDatabase;
27-
import com.mongodb.client.model.*;
32+
import com.mongodb.client.model.BulkWriteOptions;
33+
import com.mongodb.client.model.WriteModel;
2834
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2935
import org.apache.kafka.common.TopicPartition;
3036
import org.apache.kafka.connect.errors.ConnectException;
@@ -35,17 +41,17 @@
3541
import org.slf4j.Logger;
3642
import org.slf4j.LoggerFactory;
3743

38-
import java.util.*;
44+
import java.util.ArrayList;
45+
import java.util.Collection;
46+
import java.util.List;
47+
import java.util.Map;
3948
import java.util.stream.Collectors;
4049
import java.util.stream.Stream;
4150

4251
public class MongoDbSinkTask extends SinkTask {
4352

4453
private static Logger logger = LoggerFactory.getLogger(MongoDbSinkTask.class);
4554

46-
private static final UpdateOptions UPDATE_OPTIONS =
47-
new UpdateOptions().upsert(true);
48-
4955
private static final BulkWriteOptions BULK_WRITE_OPTIONS =
5056
new BulkWriteOptions().ordered(false);
5157

@@ -56,6 +62,8 @@ public class MongoDbSinkTask extends SinkTask {
5662
private int deferRetryMs;
5763
private PostProcessor processorChain;
5864
private CdcHandler cdcHandler;
65+
private WriteModelFilterStrategy replaceOneFilterStrategy;
66+
private WriteModelFilterStrategy deleteOneFilterStrategy;
5967

6068
private SinkConverter sinkConverter = new SinkConverter();
6169

@@ -84,6 +92,10 @@ public void start(Map<String, String> props) {
8492
if(sinkConfig.isUsingCdcHandler()) {
8593
cdcHandler = sinkConfig.getCdcHandler();
8694
}
95+
96+
replaceOneFilterStrategy = sinkConfig.getReplaceOneFilterStrategy();
97+
deleteOneFilterStrategy = new DeleteOneDefaultFilterStrategy();
98+
8799
}
88100

89101
@Override
@@ -145,22 +157,12 @@ public void put(Collection<SinkRecord> records) {
145157
SinkDocument doc = sinkConverter.convert(record);
146158
processorChain.process(doc, record);
147159
if(doc.getValueDoc().isPresent()) {
148-
BsonDocument vd = doc.getValueDoc().get();
149-
docsToWrite.add(
150-
new ReplaceOneModel<>(
151-
new BsonDocument(DBCollection.ID_FIELD_NAME,
152-
vd.get(DBCollection.ID_FIELD_NAME)),
153-
vd,
154-
UPDATE_OPTIONS));
160+
docsToWrite.add(replaceOneFilterStrategy.createWriteModel(doc));
155161
}
156162
else {
157163
if(doc.getKeyDoc().isPresent()
158164
&& sinkConfig.isDeleteOnNullValues()) {
159-
BsonDocument kd = doc.getKeyDoc().get();
160-
docsToWrite.add(
161-
new DeleteOneModel<>(
162-
new BsonDocument(DBCollection.ID_FIELD_NAME,
163-
kd)));
165+
docsToWrite.add(deleteOneFilterStrategy.createWriteModel(doc));
164166
}
165167
}
166168
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy;
2+
3+
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
4+
import com.mongodb.DBCollection;
5+
import com.mongodb.client.model.DeleteOneModel;
6+
import com.mongodb.client.model.WriteModel;
7+
import org.apache.kafka.connect.errors.DataException;
8+
import org.bson.BsonDocument;
9+
10+
public class DeleteOneDefaultFilterStrategy implements WriteModelFilterStrategy {
11+
12+
@Override
13+
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
14+
15+
BsonDocument kd = document.getValueDoc().orElseThrow(
16+
() -> new DataException("error: cannot build the WriteModel since"
17+
+ " the key document was missing unexpectedly")
18+
);
19+
20+
return new DeleteOneModel<>(new BsonDocument(DBCollection.ID_FIELD_NAME, kd));
21+
22+
}
23+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy;
2+
3+
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
4+
import com.mongodb.DBCollection;
5+
import com.mongodb.client.model.ReplaceOneModel;
6+
import com.mongodb.client.model.UpdateOptions;
7+
import com.mongodb.client.model.WriteModel;
8+
import org.apache.kafka.connect.errors.DataException;
9+
import org.bson.BsonDocument;
10+
import org.bson.BsonValue;
11+
12+
public class ReplaceOneBusinessKeyFilterStrategy implements WriteModelFilterStrategy {
13+
14+
private static final UpdateOptions UPDATE_OPTIONS =
15+
new UpdateOptions().upsert(true);
16+
17+
@Override
18+
public WriteModel<BsonDocument> createWriteModel(SinkDocument document) {
19+
20+
BsonDocument vd = document.getValueDoc().orElseThrow(
21+
() -> new DataException("error: cannot build the WriteModel since"
22+
+ " the value document was missing unexpectedly")
23+
);
24+
25+
BsonValue businessKey = vd.get(DBCollection.ID_FIELD_NAME);
26+
27+
if(businessKey == null || !(businessKey instanceof BsonDocument)) {
28+
throw new DataException("error: cannot build the WriteModel since"
29+
+ " the value document does not contain an _id field of type BsonDocument"
30+
+ " which holds the business key fields");
31+
}
32+
33+
vd.remove(DBCollection.ID_FIELD_NAME);
34+
35+
return new ReplaceOneModel<>((BsonDocument)businessKey, vd, UPDATE_OPTIONS);
36+
37+
}
38+
}

0 commit comments

Comments
 (0)