diff --git a/build.gradle b/build.gradle index 119a0e1f9443c9..44f4d73d47fd8d 100644 --- a/build.gradle +++ b/build.gradle @@ -66,6 +66,10 @@ project.ext.externalDependency = [ 'springCore': 'org.springframework:spring-core:5.1.6.RELEASE', 'springJdbc': 'org.springframework:spring-jdbc:5.1.6.RELEASE', 'springWeb': 'org.springframework:spring-web:5.1.6.RELEASE', + 'springBootStarterWeb': 'org.springframework.boot:spring-boot-starter-web:2.1.2.RELEASE', + 'springBootStarterJetty': 'org.springframework.boot:spring-boot-starter-jetty:2.1.2.RELEASE', + 'springKafka': 'org.springframework.kafka:spring-kafka:2.2.3.RELEASE', + 'springActuator': 'org.springframework.boot:spring-boot-starter-actuator:2.1.2.RELEASE', 'testng': 'org.testng:testng:6.9.9' ] @@ -102,4 +106,4 @@ subprojects { } } } -} \ No newline at end of file +} diff --git a/docker/mce-consumer/Dockerfile b/docker/mce-consumer/Dockerfile index 40083a240df559..51e9d9116647a6 100644 --- a/docker/mce-consumer/Dockerfile +++ b/docker/mce-consumer/Dockerfile @@ -4,9 +4,13 @@ MAINTAINER Kerem Sahin ksahin@linkedin.com COPY . datahub-src RUN cd datahub-src && ./gradlew :metadata-jobs:mce-consumer-job:build \ - && cp metadata-jobs/mce-consumer-job/build/distributions/mce-consumer-job.zip ../mce-consumer-job.zip \ - && cd .. && rm -rf datahub-src && unzip mce-consumer-job.zip + && cp metadata-jobs/mce-consumer-job/build/libs/mce-consumer-job.jar ../mce-consumer-job.jar \ + && cd .. && rm -rf datahub-src FROM openjdk:8-jre-alpine -COPY --from=builder /mce-consumer-job /mce-consumer-job/ +COPY --from=builder /mce-consumer-job.jar /mce-consumer-job.jar + +EXPOSE 9090 + +ENTRYPOINT ["java", "-jar", "mce-consumer-job.jar"] diff --git a/docker/mce-consumer/docker-compose.yml b/docker/mce-consumer/docker-compose.yml index 5ef9e86d506223..6d0b8d20139b32 100644 --- a/docker/mce-consumer/docker-compose.yml +++ b/docker/mce-consumer/docker-compose.yml @@ -5,13 +5,16 @@ services: image: keremsahin/datahub-mce-consumer:latest hostname: datahub-mce-consumer container_name: datahub-mce-consumer + ports: + - "9090:9090" environment: - KAFKA_BOOTSTRAP_SERVER=broker:29092 - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 - GMS_HOST=datahub-gms - GMS_PORT=8080 - command: "sh -c './mce-consumer-job/bin/mce-consumer-job'" + - KAFKA_MCE_TOPIC_NAME=MetadataChangeEvent + - KAFKA_FMCE_TOPIC_NAME=FailedMetadataChangeEvent networks: default: - name: datahub_network \ No newline at end of file + name: datahub_network diff --git a/docker/quickstart/docker-compose.yml b/docker/quickstart/docker-compose.yml index 25e0ef473981f4..ba960e5fd21826 100644 --- a/docker/quickstart/docker-compose.yml +++ b/docker/quickstart/docker-compose.yml @@ -241,6 +241,8 @@ services: image: keremsahin/datahub-mce-consumer:latest hostname: datahub-mce-consumer container_name: datahub-mce-consumer + ports: + - "9090:9090" environment: - KAFKA_BOOTSTRAP_SERVER=broker:29092 - KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 @@ -250,7 +252,7 @@ services: - kafka-setup - datahub-gms command: "sh -c 'while ping -c1 kafka-setup &>/dev/null; do echo waiting for kafka-setup... && sleep 1; done; \ - echo kafka-setup done! && ./mce-consumer-job/bin/mce-consumer-job'" + echo kafka-setup done! && java -jar mce-consumer-job.jar'" networks: default: diff --git a/metadata-jobs/mce-consumer-job/README.md b/metadata-jobs/mce-consumer-job/README.md index 1d2f2e51a663b7..b7fcd7270d0b2f 100644 --- a/metadata-jobs/mce-consumer-job/README.md +++ b/metadata-jobs/mce-consumer-job/README.md @@ -29,5 +29,14 @@ Quickest way to try out `MCE Consumer Job` is running the [Docker image](../../d If you do modify things and want to try it out quickly without building the Docker image, you can also run the application directly from command line after a successful [build](#build): ``` -./gradlew :metadata-jobs:mce-consumer-job:run -``` \ No newline at end of file +./gradlew :metadata-jobs:mce-consumer-job:bootRun +``` +## Endpoints +Spring boot actuator has been enabled for MCE Application. +`healthcheck`, `metrics` and `info` web endpoints are enabled by default. + +`healthcheck` - http://localhost:9090/actuator/health +`metrics` - http://localhost:9090/actuator/metrics + +To retrieve a specific metric - http://localhost:9090/actuator/metrics/kafka.consumer.records.consumed.total + diff --git a/metadata-jobs/mce-consumer-job/build.gradle b/metadata-jobs/mce-consumer-job/build.gradle index 3d5959069018fc..cf97f5ff2b98ab 100644 --- a/metadata-jobs/mce-consumer-job/build.gradle +++ b/metadata-jobs/mce-consumer-job/build.gradle @@ -1,5 +1,8 @@ -apply plugin: 'application' -apply plugin: 'java' +plugins { + id 'org.springframework.boot' version '2.1.2.RELEASE' + id 'java' +} + apply plugin: 'pegasus' configurations { @@ -23,6 +26,13 @@ dependencies { compile externalDependency.kafkaAvroSerde compile externalDependency.kafkaStreams compile externalDependency.kafkaSerializers + compile (externalDependency.springBootStarterWeb) { + exclude module: "spring-boot-starter-tomcat" + } + compile externalDependency.springBootStarterJetty + compile externalDependency.springKafka + + compile externalDependency.springActuator compileOnly externalDependency.lombok @@ -47,4 +57,6 @@ clean { project.delete("src/main/resources/avro") } -mainClassName = 'com.linkedin.metadata.kafka.MceStreamTask' +bootJar { + mainClassName = 'com.linkedin.metadata.kafka.MceConsumerApplication' +} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java new file mode 100644 index 00000000000000..d10c980b820e99 --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceConsumerApplication.java @@ -0,0 +1,15 @@ +package com.linkedin.metadata.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; + +@SpringBootApplication(exclude = {RestClientAutoConfiguration.class, KafkaAutoConfiguration.class}) +public class MceConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(MceConsumerApplication.class, args); + } + +} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceStreamTask.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceStreamTask.java deleted file mode 100644 index f45538e776ad4c..00000000000000 --- a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MceStreamTask.java +++ /dev/null @@ -1,208 +0,0 @@ -package com.linkedin.metadata.kafka; - -import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.RecordTemplate; -import com.linkedin.metadata.EventUtils; -import com.linkedin.metadata.dao.internal.BaseRemoteWriterDAO; -import com.linkedin.metadata.dao.internal.RestliRemoteWriterDAO; -import com.linkedin.metadata.dao.utils.ModelUtils; -import com.linkedin.metadata.dao.utils.RecordUtils; -import com.linkedin.metadata.restli.DefaultRestliClientFactory; -import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.mxe.FailedMetadataChangeEvent; -import com.linkedin.mxe.MetadataChangeEvent; -import com.linkedin.mxe.Topics; -import com.linkedin.restli.client.Client; -import com.linkedin.util.Configuration; - -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; -import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer; -import lombok.extern.slf4j.Slf4j; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; -import org.apache.kafka.streams.kstream.KStream; - -import javax.annotation.Nonnull; - -import java.io.IOException; -import java.util.Properties; - - -@Slf4j -public class MceStreamTask { - - private static final String DEFAULT_MCE_KAFKA_TOPIC_NAME = Topics.METADATA_CHANGE_EVENT; - private static final String DEFAULT_FMCE_KAFKA_TOPIC_NAME = Topics.FAILED_METADATA_CHANGE_EVENT; - private static final String DEFAULT_GMS_HOST = "localhost"; - private static final String DEFAULT_GMS_PORT = "8080"; - private static final String DEFAULT_KAFKA_BOOTSTRAP_SERVER = "localhost:9092"; - private static final String DEFAULT_KAFKA_SCHEMAREGISTRY_URL = "http://localhost:8081"; - private static KafkaProducer producer; - private static BaseRemoteWriterDAO _remoteWriterDAO; - - public static void main(final String[] args) { - log.info("Creating MCE consumer task"); - final Client restClient = DefaultRestliClientFactory.getRestLiClient( - Configuration.getEnvironmentVariable("GMS_HOST", DEFAULT_GMS_HOST), - Integer.parseInt(Configuration.getEnvironmentVariable("GMS_PORT", DEFAULT_GMS_PORT)) - ); - _remoteWriterDAO = new RestliRemoteWriterDAO(restClient); - log.info("RemoteWriterDAO built successfully"); - - // Configure the Streams application. - final Properties streamsConfiguration = getStreamsConfiguration(); - - // Configure the Kakfa Producer for sending Failed MCE - final Properties producerProperties = getProducerProperties(); - - // Define the Producer for Sending Failed MCE Events - producer = new KafkaProducer<>(producerProperties); - - - // Define the processing topology of the Streams application. - final StreamsBuilder builder = new StreamsBuilder(); - createProcessingTopology(builder); - final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); - - // Clean local state prior to starting the processing topology. - streams.cleanUp(); - - // Now run the processing topology via `start()` to begin processing its input data. - streams.start(); - - // Add shutdown hook to respond to SIGTERM and gracefully close the Streams application. - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); - } - - /** - * Configure the Streams application. - * - * @return Properties getStreamsConfiguration - */ - static Properties getStreamsConfiguration() { - final Properties streamsConfiguration = new Properties(); - // Give the Streams application a unique name. The name must be unique in the Kafka cluster - // against which the application is run. - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "mce-consuming-job"); - streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "mce-consuming-job-client"); - // Where to find Kafka broker(s). - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, - Configuration.getEnvironmentVariable("KAFKA_BOOTSTRAP_SERVER", DEFAULT_KAFKA_BOOTSTRAP_SERVER)); - // Specify default (de)serializers for record keys and for record values. - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName()); - streamsConfiguration.put("schema.registry.url", - Configuration.getEnvironmentVariable("KAFKA_SCHEMAREGISTRY_URL", DEFAULT_KAFKA_SCHEMAREGISTRY_URL)); - // Continue handling events after exception - streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, - LogAndContinueExceptionHandler.class); - // Records will be flushed every 10 seconds. - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.valueOf(10000)); - // Disable record caches. - streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); - return streamsConfiguration; - } - - /** - * KafkaProducer Properties to produce FailedMetadataChangeEvent - * - * @return Properties producerConfig - */ - static Properties getProducerProperties() { - final Properties producerConfig = new Properties(); - - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - Configuration.getEnvironmentVariable("KAFKA_BOOTSTRAP_SERVER", DEFAULT_KAFKA_BOOTSTRAP_SERVER)); - producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, - Configuration.getEnvironmentVariable("KAFKA_SCHEMAREGISTRY_URL", DEFAULT_KAFKA_SCHEMAREGISTRY_URL)); - producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "failed-mce-producer"); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class); - - return producerConfig; - } - - /** - * Define the processing topology for job. - * - * @param builder StreamsBuilder to use - */ - static void createProcessingTopology(final StreamsBuilder builder) { - // Construct a `KStream` from the input topic. - // The default key and value serdes will be used. - final KStream messages = builder.stream( - Configuration.getEnvironmentVariable("KAFKA_TOPIC_NAME", DEFAULT_MCE_KAFKA_TOPIC_NAME) - ); - messages.foreach((k, v) -> processSingleMCE(v)); - } - - /** - * Process MCE and write in the underlying DB. - * - * @param record single MCE message - */ - static void processSingleMCE(final GenericData.Record record) { - log.debug("Got MCE"); - com.linkedin.mxe.MetadataChangeEvent event = new MetadataChangeEvent(); - try { - event = EventUtils.avroToPegasusMCE(record); - - if (event.hasProposedSnapshot()) { - processProposedSnapshot(event.getProposedSnapshot()); - } - } catch (Throwable throwable) { - log.error("MCE Processor Error", throwable); - log.error("Message: {}", record); - sendFailedMCE(event, throwable); - } - } - - /** - * Sending Failed MCE Event to Kafka Topic - * @param event - * @param throwable - */ - private static void sendFailedMCE(@Nonnull MetadataChangeEvent event, @Nonnull Throwable throwable) { - final FailedMetadataChangeEvent failedMetadataChangeEvent = createFailedMCEEvent(event, throwable); - try { - final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCE(failedMetadataChangeEvent); - log.debug("FailedMetadataChangeEvent:"+failedMetadataChangeEvent); - producer.send(new ProducerRecord<>( - Configuration.getEnvironmentVariable("FAILED_MCE_KAFKA_TOPIC_NAME", DEFAULT_FMCE_KAFKA_TOPIC_NAME), - genericFailedMCERecord)); - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * Populate a FailedMetadataChangeEvent from a MCE - * @param event - * @param throwable - * @return FailedMetadataChangeEvent - */ - @Nonnull - private static FailedMetadataChangeEvent createFailedMCEEvent(@Nonnull MetadataChangeEvent event, @Nonnull Throwable throwable) { - final FailedMetadataChangeEvent fmce = new FailedMetadataChangeEvent(); - fmce.setError(ExceptionUtils.getStackTrace(throwable)); - fmce.setMetadataChangeEvent(event); - return fmce; - } - - static void processProposedSnapshot(@Nonnull Snapshot snapshotUnion) { - final RecordTemplate snapshot = RecordUtils.getSelectedRecordTemplateFromUnion(snapshotUnion); - final Urn urn = ModelUtils.getUrnFromSnapshotUnion(snapshotUnion); - _remoteWriterDAO.create(urn, snapshot); - } -} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java new file mode 100644 index 00000000000000..3537be502e6b48 --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/MetadataChangeEventsProcessor.java @@ -0,0 +1,102 @@ +package com.linkedin.metadata.kafka; + +import java.io.IOException; +import java.net.URISyntaxException; + +import javax.annotation.Nonnull; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.EventUtils; +import com.linkedin.metadata.dao.internal.BaseRemoteWriterDAO; +import com.linkedin.metadata.dao.utils.ModelUtils; +import com.linkedin.metadata.dao.utils.RecordUtils; +import com.linkedin.metadata.snapshot.Snapshot; +import com.linkedin.mxe.FailedMetadataChangeEvent; +import com.linkedin.mxe.MetadataChangeEvent; +import com.linkedin.mxe.Topics; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Component +public class MetadataChangeEventsProcessor { + + private BaseRemoteWriterDAO remoteWriterDAO; + @Autowired + private KafkaProducer kafkaProducer; + @Value("${KAFKA_FMCE_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}") + private String fmceTopicName; + + public MetadataChangeEventsProcessor(BaseRemoteWriterDAO remoteWriterDAO) { + this.remoteWriterDAO = remoteWriterDAO; + } + + public void processSingleMCE(final GenericRecord record) { + log.debug("Got MCE"); + log.debug("Record ", record); + + MetadataChangeEvent event = new MetadataChangeEvent(); + + try { + event = EventUtils.avroToPegasusMCE(record); + log.debug("MetadataChangeEvent {}", event); + if (event.hasProposedSnapshot()) { + processProposedSnapshot(event); + } + } catch (Throwable throwable) { + log.error("MCE Processor Error", throwable); + log.error("Message: {}", record); + sendFailedMCE(event, throwable); + } + } + + /** + * Sending Failed MCE Event to Kafka Topic + * + * @param event + * @param throwable + */ + private void sendFailedMCE(@Nonnull MetadataChangeEvent event, @Nonnull Throwable throwable) { + final FailedMetadataChangeEvent failedMetadataChangeEvent = createFailedMCEEvent(event, throwable); + try { + final GenericRecord genericFailedMCERecord = EventUtils.pegasusToAvroFailedMCE(failedMetadataChangeEvent); + log.debug("Sending FailedMessages to topic - {}", fmceTopicName); + log.info("Error while processing MCE: FailedMetadataChangeEvent - {}", failedMetadataChangeEvent); + this.kafkaProducer.send(new ProducerRecord<>(fmceTopicName, genericFailedMCERecord)); + } catch (IOException e) { + log.error("Error while sending FailedMetadataChangeEvent: Exception - {}, FailedMetadataChangeEvent - {}", e.getStackTrace(), failedMetadataChangeEvent); + } + } + + /** + * Populate a FailedMetadataChangeEvent from a MCE + * + * @param event + * @param throwable + * @return FailedMetadataChangeEvent + */ + @Nonnull + private FailedMetadataChangeEvent createFailedMCEEvent(@Nonnull MetadataChangeEvent event, @Nonnull Throwable throwable) { + final FailedMetadataChangeEvent fmce = new FailedMetadataChangeEvent(); + fmce.setError(ExceptionUtils.getStackTrace(throwable)); + fmce.setMetadataChangeEvent(event); + return fmce; + } + + private void processProposedSnapshot(@Nonnull MetadataChangeEvent metadataChangeEvent) throws URISyntaxException { + Snapshot snapshotUnion = metadataChangeEvent.getProposedSnapshot(); + final RecordTemplate snapshot = RecordUtils.getSelectedRecordTemplateFromUnion(snapshotUnion); + final Urn urn = ModelUtils.getUrnFromSnapshotUnion(snapshotUnion); + remoteWriterDAO.create(urn, snapshot); + } + +} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaProducerConfig.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaProducerConfig.java new file mode 100644 index 00000000000000..c63f3e057b0433 --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaProducerConfig.java @@ -0,0 +1,58 @@ +package com.linkedin.metadata.kafka.config; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.ProducerFactory; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer; + +@Configuration +@EnableKafka +public class KafkaProducerConfig { + + @Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}") + private String kafkaBootstrapServer; + @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") + private String kafkaSchemaRegistryUrl; + + /** + * KafkaProducer Properties to produce FailedMetadataChangeEvent + * + * @return Properties producerConfig + */ + @Bean + public Map getProducerConfig() { + final Map producerConfigMap = new HashMap<>(); + + producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + com.linkedin.util.Configuration.getEnvironmentVariable("KAFKA_BOOTSTRAP_SERVER", kafkaBootstrapServer)); + producerConfigMap.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, + com.linkedin.util.Configuration.getEnvironmentVariable("KAFKA_SCHEMAREGISTRY_URL", kafkaSchemaRegistryUrl)); + producerConfigMap.put(ProducerConfig.CLIENT_ID_CONFIG, "failed-mce-producer"); + producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GenericAvroSerializer.class); + + return producerConfigMap; + } + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(getProducerConfig()); + } + + @Bean + public KafkaProducer kafkaProducer() { + return new KafkaProducer<>(getProducerConfig()); + } +} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaStreamsConfig.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaStreamsConfig.java new file mode 100644 index 00000000000000..aa89edf90fb999 --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/KafkaStreamsConfig.java @@ -0,0 +1,75 @@ +package com.linkedin.metadata.kafka.config; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.generic.GenericRecord; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.kstream.KStream; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.EnableKafkaStreams; +import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; +import org.springframework.kafka.config.KafkaStreamsConfiguration; + +import com.linkedin.metadata.kafka.MetadataChangeEventsProcessor; +import com.linkedin.mxe.Topics; + +import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; +import lombok.extern.slf4j.Slf4j; + +@Configuration +@EnableKafka +@EnableKafkaStreams +@Slf4j +public class KafkaStreamsConfig { + + @Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}") + private String kafkaBootstrapServer; + @Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") + private String kafkaSchemaRegistryUrl; + @Value("${KAFKA_MCE_TOPIC_NAME:" + Topics.METADATA_CHANGE_EVENT + "}") + private String mceTopicName; + + private final MetadataChangeEventsProcessor eventsProcessor; + + public KafkaStreamsConfig(MetadataChangeEventsProcessor eventsProcessor) { + this.eventsProcessor = eventsProcessor; + } + + @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) + public KafkaStreamsConfiguration kStreamsConfigs() { + Map props = new HashMap<>(); + // Give the Streams application a unique name. The name must be unique in the Kafka cluster + // against which the application is run. + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mce-consuming-job"); + props.put(StreamsConfig.CLIENT_ID_CONFIG, "mce-consuming-job-client"); + // Where to find Kafka broker(s). + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); + // Specify default (de)serializers for record keys and for record values. + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName()); + props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); + // Continue handling events after exception + props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + // Records will be flushed every 10 seconds. + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); + // Disable record caches. + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + return new KafkaStreamsConfiguration(props); + } + + @Bean + public KStream kStream(StreamsBuilder kStreamBuilder) { + final KStream messages = kStreamBuilder.stream(mceTopicName); + messages.foreach((k, v) -> eventsProcessor.processSingleMCE(v)); + return messages; + } + +} diff --git a/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/RemoteWriterConfig.java b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/RemoteWriterConfig.java new file mode 100644 index 00000000000000..5a53cab2cc325a --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/java/com/linkedin/metadata/kafka/config/RemoteWriterConfig.java @@ -0,0 +1,25 @@ +package com.linkedin.metadata.kafka.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.linkedin.metadata.dao.internal.BaseRemoteWriterDAO; +import com.linkedin.metadata.dao.internal.RestliRemoteWriterDAO; +import com.linkedin.metadata.restli.DefaultRestliClientFactory; +import com.linkedin.restli.client.Client; + +@Configuration +public class RemoteWriterConfig { + + @Value("${GMS_HOST:localhost}") + private String gmsHost; + @Value("${GMS_PORT:8080}") + private int gmsPort; + + @Bean + public BaseRemoteWriterDAO remoteWriterDAO() { + Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort); + return new RestliRemoteWriterDAO(restClient); + } +} diff --git a/metadata-jobs/mce-consumer-job/src/main/resources/application.properties b/metadata-jobs/mce-consumer-job/src/main/resources/application.properties new file mode 100644 index 00000000000000..369a3cf62d415d --- /dev/null +++ b/metadata-jobs/mce-consumer-job/src/main/resources/application.properties @@ -0,0 +1,2 @@ +server.port=9090 +management.endpoints.web.exposure.include=metrics, health, info diff --git a/metadata-jobs/mce-consumer-job/src/main/resources/logback.xml b/metadata-jobs/mce-consumer-job/src/main/resources/logback.xml index 1876851b5c18ce..17b9176ed74137 100644 --- a/metadata-jobs/mce-consumer-job/src/main/resources/logback.xml +++ b/metadata-jobs/mce-consumer-job/src/main/resources/logback.xml @@ -1,12 +1,30 @@ + + - INFO %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + ${LOG_DIR}/mce-consumer-job.log + true + + %d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n + + + ${LOG_DIR}/mce-consumer-job.%i.log + 1 + 3 + + + 100MB + + + - + +