Skip to content

Conversation

@ekawinataa
Copy link
Member

No description provided.

@ekawinataa ekawinataa changed the title add kafka security module Add Consumer and Sink SASL Related Config Sep 3, 2024
@ekawinataa ekawinataa changed the title Add Consumer and Sink SASL Related Config Add Consumer and Sink Kafka SASL Related Config Sep 3, 2024
@ekawinataa ekawinataa changed the title Add Consumer and Sink Kafka SASL Related Config Add dynamic source and sink kafka properties Sep 5, 2024
@ekawinataa
Copy link
Member Author

Producer configs :
image
image
image

sumitaich1998
sumitaich1998 previously approved these changes Sep 12, 2024
@sumitaich1998 sumitaich1998 dismissed their stale review September 12, 2024 10:29

some changes needed

sumitaich1998
sumitaich1998 previously approved these changes Sep 12, 2024
@sumitaich1998 sumitaich1998 dismissed their stale review September 12, 2024 10:40

some changes needed

Copy link

@sumitaich1998 sumitaich1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add unit tests for DaggerKafkaConsumerAdditionalConfigurationsAdaptor and KafkaConfigUtil

Comment on lines 36 to 41
try {
daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader);
} catch (IllegalArgumentException e) {
assertEquals("Invalid additional kafka consumer configuration properties found: [INVALID_KEY]", e.getMessage());
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of try catch use expected -
@Test(expected = IllegalArgumentException.class) public void shouldThrowExceptionForInvalidProperties() throws IOException { String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\",\"INVALID_KEY\":\"value3\"}"; JsonReader jsonReader = new JsonReader(new StringReader(input)); DaggerKafkaConsumerAdditionalConfigurationsAdaptor daggerKafkaConsumerAdditionalConfigurationsAdaptor = new DaggerKafkaConsumerAdditionalConfigurationsAdaptor(); daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader); }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intention is to assert the exception message as well, but let me update it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ekawinataa check if (expected = xyz, exception-message = abc) "exception-message" or some other property can be used to assert this, but if the exception message is dynamic we can't use this.

But yes, its good to assert the exception message. So verify it, if not possible keep as is.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use rule for exception message assertions -

@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void shouldThrowExceptionForInvalidProperties() throws IOException {
String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\",\"INVALID_KEY\":\"value3\"}";
JsonReader jsonReader = new JsonReader(new StringReader(input));
DaggerKafkaConsumerAdditionalConfigurationsAdaptor daggerKafkaConsumerAdditionalConfigurationsAdaptor = new DaggerKafkaConsumerAdditionalConfigurationsAdaptor();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid additional kafka consumer configuration properties found: [INVALID_KEY]");
daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader);
}```

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rajuGT @sumitaich1998 okay, I've checked the current Test annotation and there is no option to assert the message. Let's keep the message assertion, I've added explicit fail in case the expected exception is not thrown. Thanks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no that is not the right way, please use rule to assert message like I've shown above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated as per discussion @sumitaich1998

Comment on lines 30 to 36
@Test(expected = IllegalArgumentException.class)
public void shouldThrowExceptionForInvalidProperties() throws IOException {
String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\",\"INVALID_KEY\":\"value3\"}";
JsonReader jsonReader = new JsonReader(new StringReader(input));
DaggerKafkaConsumerAdditionalConfigurationsAdaptor daggerKafkaConsumerAdditionalConfigurationsAdaptor = new DaggerKafkaConsumerAdditionalConfigurationsAdaptor();

try {
daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader);
} catch (IllegalArgumentException e) {
assertEquals("Invalid additional kafka consumer configuration properties found: [INVALID_KEY]", e.getMessage());
}
daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use rule for exception message assertions -

@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void shouldThrowExceptionForInvalidProperties() throws IOException {
String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1\":\"value1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2\":\"value2\",\"INVALID_KEY\":\"value3\"}";
JsonReader jsonReader = new JsonReader(new StringReader(input));
DaggerKafkaConsumerAdditionalConfigurationsAdaptor daggerKafkaConsumerAdditionalConfigurationsAdaptor = new DaggerKafkaConsumerAdditionalConfigurationsAdaptor();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Invalid additional kafka consumer configuration properties found: [INVALID_KEY]");
daggerKafkaConsumerAdditionalConfigurationsAdaptor.read(jsonReader);
}```

Comment on lines +15 to +16
public class DaggerKafkaConsumerAdditionalConfigurationsAdaptorTest {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add more unit tests covering all possible edge cases , few I've written below -

    @Test
    public void shouldHandleEmptyMap() throws IOException {
        String input = "{}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        Map<String, String> result = adaptor.read(jsonReader);
        
        assertTrue(result.isEmpty());
    }

    @Test
    public void shouldHandleSingleValidProperty() throws IOException {
        String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"value\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        Map<String, String> result = adaptor.read(jsonReader);
        
        assertEquals(1, result.size());
        assertEquals("value", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY"));
    }

    @Test(expected = IllegalArgumentException.class)
    public void shouldThrowExceptionForAllInvalidProperties() throws IOException {
        String input = "{\"INVALID_KEY_1\":\"value1\",\"INVALID_KEY_2\":\"value2\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        adaptor.read(jsonReader);
    }

    @Test
    public void shouldIgnoreNullValues() throws IOException {
        String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":null}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        Map<String, String> result = adaptor.read(jsonReader);
        
        assertTrue(result.isEmpty());
    }

    @Test
    public void shouldHandleMixOfValidAndInvalidProperties() throws IOException {
        String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"value\",\"INVALID_KEY\":\"invalid\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        try {
            adaptor.read(jsonReader);
            fail("Expected IllegalArgumentException");
        } catch (IllegalArgumentException e) {
            assertTrue(e.getMessage().contains("INVALID_KEY"));
        }
    }

    @Test
    public void shouldWriteMapToJsonString() throws IOException {
        Map<String, String> map = new HashMap<>();
        map.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY", "value");
        StringWriter stringWriter = new StringWriter();
        JsonWriter jsonWriter = new JsonWriter(stringWriter);
        
        adaptor.write(jsonWriter, map);
        
        String result = stringWriter.toString();
        assertTrue(result.contains("\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"value\""));
    }

    @Test
    public void shouldHandleSpecialCharactersInValues() throws IOException {
        String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"value with spaces and $pecial ch@racters\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        Map<String, String> result = adaptor.read(jsonReader);
        
        assertEquals("value with spaces and $pecial ch@racters", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY"));
    }

    @Test
    public void shouldHandleNumericValues() throws IOException {
        String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"123\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        Map<String, String> result = adaptor.read(jsonReader);
        
        assertEquals("123", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY"));
    }

    @Test
    public void shouldHandleBooleanValues() throws IOException {
        String input = "{\"SOURCE_KAFKA_CONSUMER_CONFIG_KEY\":\"true\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        Map<String, String> result = adaptor.read(jsonReader);
        
        assertEquals("true", result.get("SOURCE_KAFKA_CONSUMER_CONFIG_KEY"));
    }

    @Test
    public void shouldHandleCaseInsensitiveKeys() throws IOException {
        String input = "{\"source_kafka_consumer_config_key\":\"value\"}";
        JsonReader jsonReader = new JsonReader(new StringReader(input));
        
        try {
            adaptor.read(jsonReader);
            fail("Expected IllegalArgumentException");
        } catch (IllegalArgumentException e) {
            assertTrue(e.getMessage().contains("source_kafka_consumer_config_key"));
        }
    }

Comment on lines +10 to +11
public class KafkaConfigUtilTest {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add more unit tests covering all possible edge cases in the pr

    @Test
    public void shouldReturnEmptyPropertiesWhenInputIsEmpty() {
        Properties properties = new Properties();
        Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties);

        assertEquals(0, kafkaProperties.size());
    }

    @Test
    public void shouldReturnEmptyPropertiesWhenAllKeysAreInvalid() {
        Properties properties = new Properties();
        properties.put("INVALID_KEY_1", "value1");
        properties.put("INVALID_KEY_2", "value2");
        properties.put("ANOTHER_INVALID_KEY", "value3");

        Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties);

        assertEquals(0, kafkaProperties.size());
    }

    @Test
    public void shouldParseOnlyValidKeysWhenMixedWithInvalidOnes() {
        Properties properties = new Properties();
        properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_1", "value1");
        properties.put("INVALID_KEY", "value2");
        properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_KEY_2", "value3");
        properties.put("ANOTHER_INVALID_KEY", "value4");

        Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties);

        assertEquals(2, kafkaProperties.size());
        assertEquals("value1", kafkaProperties.getProperty("key.1"));
        assertEquals("value3", kafkaProperties.getProperty("key.2"));
    }

    @Test
    public void shouldParseCaseInsensitiveKeys() {
        Properties properties = new Properties();
        properties.put("source_kafka_consumer_config_KEY_1", "value1");
        properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_key_2", "value2");

        Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties);

        assertEquals(2, kafkaProperties.size());
        assertEquals("value1", kafkaProperties.getProperty("key.1"));
        assertEquals("value2", kafkaProperties.getProperty("key.2"));
    }

    @Test
    public void shouldParseKeysWithMultipleUnderscores() {
        Properties properties = new Properties();
        properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_MULTI_WORD_KEY", "value1");
        properties.put("SOURCE_KAFKA_CONSUMER_CONFIG_ANOTHER_MULTI_WORD_KEY", "value2");

        Properties kafkaProperties = KafkaConfigUtil.parseKafkaConfiguration(KafkaConnectorTypesMetadata.SOURCE, properties);

        assertEquals(2, kafkaProperties.size());
        assertEquals("value1", kafkaProperties.getProperty("multi.word.key"));
        assertEquals("value2", kafkaProperties.getProperty("another.multi.word.key"));
    }

@ekawinataa ekawinataa changed the title Add dynamic source and sink kafka properties feat: Add dynamic source and sink kafka properties Sep 16, 2024
@sumitaich1998 sumitaich1998 merged commit d7b2709 into main Sep 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants