Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Merge MCE and MAE with GMS
  • Loading branch information
jjoyce0510 committed Jun 10, 2021
commit c0184d91f7c09ab971efbdf68e885ce4c8e6598d
2 changes: 2 additions & 0 deletions gms/war/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ ext.apiProject = project(':gms:api')

dependencies {
runtime project(':gms:factories')
runtime project(':metadata-jobs:mce-consumer-job')
runtime project(':metadata-jobs:mae-consumer-job')

runtime externalDependency.h2
runtime externalDependency.logbackClassic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.metadata.PegasusUtils.*;

Expand All @@ -35,9 +37,9 @@
public class EbeanEntityService extends EntityService {

private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;
private static final Logger LOG = LoggerFactory.getLogger("EbeanEntityService");

private final EbeanAspectDao _entityDao;

private Boolean _alwaysEmitAuditEvent = false;

public EbeanEntityService(
Expand All @@ -51,6 +53,9 @@ public EbeanEntityService(
@Override
@Nonnull
public Map<Urn, List<RecordTemplate>> getLatestAspects(@Nonnull final Set<Urn> urns, @Nonnull final Set<String> aspectNames) {

LOG.info(String.format("Getting latest aspects. urns: %s, aspectNames: %s.", urns, aspectNames));

// Create DB keys
final Set<EbeanAspectV2.PrimaryKey> dbKeys = urns.stream()
.map(urn -> {
Expand All @@ -74,11 +79,13 @@ public Map<Urn, List<RecordTemplate>> getLatestAspects(@Nonnull final Set<Urn> u

// Add "key" aspects for each urn. TODO: Replace this with a materialized key aspect.
urnToAspects.keySet().forEach(key -> {
LOG.info(String.format("Adding key aspect. urn: %s, aspectNames: %s.", key, aspectNames));
final RecordTemplate keyAspect = buildKeyAspect(key);
urnToAspects.get(key).add(keyAspect);
});

_entityDao.batchGet(dbKeys).forEach((key, aspectEntry) -> {
LOG.info(String.format("Successfully fetched aspect. urn: %s, aspect: %s.", key.getUrn(), key.getAspect()));
final Urn urn = toUrn(key.getUrn());
final String aspectName = key.getAspect();
final RecordTemplate aspectRecord = toAspectRecord(urn, aspectName, aspectEntry.getMetadata());
Expand Down Expand Up @@ -145,20 +152,27 @@ private RecordTemplate ingestAspect(
@Nonnull final AuditStamp auditStamp,
final int maxTransactionRetry) {

LOG.info(String.format("Ingesting aspect with urn: %s, name: %s", urn, aspectName));

final AddAspectResult result = _entityDao.runInTransactionWithRetry(() -> {

// 1. Fetch the latest existing version of the aspect.
final EbeanAspectV2 latest = _entityDao.getLatestAspect(urn.toString(), aspectName);

LOG.info(String.format("Fetched latest aspect to ingest with urn: %s, name: %s", urn, aspectName));

// 2. Compare the latest existing and new.
final RecordTemplate oldValue = latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata());
final RecordTemplate newValue = updateLambda.apply(Optional.ofNullable(oldValue));

// 3. Skip updating if there is no difference between existing and new.
if (oldValue != null && DataTemplateUtil.areEqual(oldValue, newValue)) {
LOG.info(String.format("Aspect has not changed between previous and new versions. Skipping ingest. urn: %s, name: %s", urn, aspectName));
return new AddAspectResult(urn, oldValue, oldValue);
}

LOG.info(String.format("Found differing aspects. Ingesting new aspect version. urn: %s, name: %s", urn, aspectName));

// 4. Save the newValue as the latest version
_entityDao.saveLatestAspect(
urn.toString(),
Expand All @@ -182,7 +196,10 @@ private RecordTemplate ingestAspect(

// 5. Produce MAE after a successful update
if (oldValue != newValue || _alwaysEmitAuditEvent) {
LOG.info(String.format("Producing MAE for ingested aspect. urn: %s, name: %s", urn, aspectName));
produceMetadataAuditEvent(urn, oldValue, newValue);
} else {
LOG.info(String.format("Skipping MAE for ingested aspect. urn: %s, name: %s", urn, aspectName));
}

return newValue;
Expand Down Expand Up @@ -217,12 +234,16 @@ private RecordTemplate updateAspect(
@Nonnull final boolean emitMae,
final int maxTransactionRetry) {

LOG.info(String.format("Updating aspect with urn: %s, name: %s", urn, aspectName));

final AddAspectResult result = _entityDao.runInTransactionWithRetry(() -> {

final EbeanAspectV2 oldAspect = _entityDao.getAspect(urn.toString(), aspectName, version);
final RecordTemplate oldValue = oldAspect == null ? null : toAspectRecord(urn, aspectName,
oldAspect.getMetadata());

LOG.info(String.format("Fetched old aspect to update with urn: %s, name: %s. Saving new version.", urn, aspectName));

_entityDao.saveAspect(
urn.toString(),
aspectName,
Expand All @@ -242,7 +263,10 @@ private RecordTemplate updateAspect(
final RecordTemplate newValue = result.getNewValue();

if (emitMae) {
LOG.info(String.format("Producing MAE for updated aspect. %s, name: %s.", urn, aspectName));
produceMetadataAuditEvent(urn, oldValue, newValue);
} else {
LOG.info(String.format("Skipping MAE for updated aspect. %s, name: %s.", urn, aspectName));
}

return newValue;
Expand Down
11 changes: 1 addition & 10 deletions metadata-jobs/mae-consumer-job/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
plugins {
id 'org.springframework.boot'
id 'java'
}

Expand Down Expand Up @@ -31,10 +30,6 @@ dependencies {
compile externalDependency.kafkaAvroSerde
compile externalDependency.neo4jJavaDriver

compile (externalDependency.springBootStarterWeb) {
exclude module: "spring-boot-starter-tomcat"
}
compile externalDependency.springBootStarterJetty
compile externalDependency.springKafka
compile externalDependency.springActuator

Expand All @@ -59,8 +54,4 @@ compileJava.dependsOn avroSchemaSources

clean {
project.delete("src/main/resources/avro")
}

bootJar {
mainClassName = 'com.linkedin.metadata.kafka.MaeConsumerApplication'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@ConditionalOnProperty(value = "DATAHUB_ANALYTICS_ENABLED", havingValue = "true", matchIfMissing = true)
@EnableKafka
public class DataHubUsageEventsProcessor {

private final ElasticsearchConnector elasticSearchConnector;
private final DataHubUsageEventTransformer dataHubUsageEventTransformer;
private final String indexName;

public DataHubUsageEventsProcessor(ElasticsearchConnector elasticSearchConnector,
DataHubUsageEventTransformer dataHubUsageEventTransformer, IndexConvention indexConvention) {
public DataHubUsageEventsProcessor(
ElasticsearchConnector elasticSearchConnector,
DataHubUsageEventTransformer dataHubUsageEventTransformer,
IndexConvention indexConvention) {
this.elasticSearchConnector = elasticSearchConnector;
this.dataHubUsageEventTransformer = dataHubUsageEventTransformer;
this.indexName = indexConvention.getIndexName("datahub_usage_event");
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import com.linkedin.metadata.builders.search.GlossaryTermInfoIndexBuilder;
import com.linkedin.metadata.builders.search.GlossaryNodeInfoIndexBuilder;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import com.linkedin.restli.client.Client;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -71,12 +69,4 @@ public Set<BaseIndexBuilder<? extends RecordTemplate>> indexBuilders(@Nonnull Cl
public Client restliClient() {
return DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort);
}

/**
* Convention for naming search indices
*/
@Bean
public IndexConvention indexConvention() {
return new IndexConventionImpl(indexPrefix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

@Slf4j
@Configuration
public class KafkaConfig {
public class MaeKafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}")
private String kafkaBootstrapServer;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ElasticsearchConnectorFactory {

@Bean(name = "elasticsearchConnector")
@Nonnull
public ElasticsearchConnector createInstance(RestHighLevelClient elasticSearchRestHighLevelClient) {
public ElasticsearchConnector createInstance(@Nonnull RestHighLevelClient elasticSearchRestHighLevelClient) {
return new ElasticsearchConnector(elasticSearchRestHighLevelClient, bulkRequestsLimit, bulkFlushPeriod);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -20,7 +21,7 @@ public class ChartHydrator implements Hydrator {
private static final String DASHBOARD_TOOL = "dashboardTool";
private static final String TITLE = "title";

public ChartHydrator(Client restliClient) {
public ChartHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(ChartSnapshot.class, ChartAspect.class, _restliClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -20,7 +21,7 @@ public class CorpUserHydrator implements Hydrator {
private static final String USER_NAME = "username";
private static final String NAME = "name";

public CorpUserHydrator(Client restliClient) {
public CorpUserHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(CorpUserSnapshot.class, CorpUserAspect.class, _restliClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -20,7 +21,7 @@ public class DashboardHydrator implements Hydrator {
private static final String DASHBOARD_TOOL = "dashboardTool";
private static final String TITLE = "title";

public DashboardHydrator(Client restliClient) {
public DashboardHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(DashboardSnapshot.class, DashboardAspect.class, _restliClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -20,7 +21,7 @@ public class DataFlowHydrator implements Hydrator {
private static final String ORCHESTRATOR = "orchestrator";
private static final String NAME = "name";

public DataFlowHydrator(Client restliClient) {
public DataFlowHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(DataFlowSnapshot.class, DataFlowAspect.class, _restliClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


Expand All @@ -20,7 +21,7 @@ public class DataJobHydrator implements Hydrator {
private static final String ORCHESTRATOR = "orchestrator";
private static final String NAME = "name";

public DataJobHydrator(Client restliClient) {
public DataJobHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(DataJobSnapshot.class, DataJobAspect.class, _restliClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;


public class HydratorFactory {
Expand All @@ -14,7 +15,7 @@ public class HydratorFactory {

public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public HydratorFactory(Client restliClient) {
public HydratorFactory(@Nonnull Client restliClient) {
_restliClient = restliClient;
_hydratorMap = new HashMap<>();
_hydratorMap.put(EntityType.DATASET, new DatasetHydrator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.linkedin.metadata.kafka.hydrator.HydratorFactory;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -36,7 +37,7 @@ public static class TransformedDocument {
String document;
}

public DataHubUsageEventTransformer(HydratorFactory hydratorFactory) {
public DataHubUsageEventTransformer(@Nonnull HydratorFactory hydratorFactory) {
this.hydratorFactory = hydratorFactory;
}

Expand Down
13 changes: 2 additions & 11 deletions metadata-jobs/mce-consumer-job/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
plugins {
id 'org.springframework.boot'
id 'java'
}

Expand All @@ -26,12 +25,8 @@ dependencies {
compile spec.product.pegasus.restliCommon
compile externalDependency.elasticSearchRest
compile externalDependency.kafkaAvroSerde
compile (externalDependency.springBootStarterWeb) {
exclude module: "spring-boot-starter-tomcat"
}
compile externalDependency.springBootStarterJetty
compile externalDependency.springKafka

compile externalDependency.springKafka
compile externalDependency.springActuator

compileOnly externalDependency.lombok
Expand All @@ -54,8 +49,4 @@ compileJava.dependsOn avroSchemaSources

clean {
project.delete("src/main/resources/avro")
}

bootJar {
mainClassName = 'com.linkedin.metadata.kafka.MceConsumerApplication'
}
}

This file was deleted.

Loading