Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixing issues
  • Loading branch information
jjoyce0510 committed Jun 11, 2021
commit 1954d187e00fbd6c37180b73ff242bcd80b9d945
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ spec:
- name: MAE_CONSUMER_ENABLED
value: "true"
{{- end }}
- name: DATAHUB_ANALYTICS_ENABLED
value: "{{ .Values.global.datahub_analytics_enabled }}"
- name: EBEAN_DATASOURCE_USERNAME
value: "{{ .Values.global.sql.datasource.username }}"
- name: EBEAN_DATASOURCE_PASSWORD
Expand Down
2 changes: 2 additions & 0 deletions datahub-kubernetes/datahub/charts/datahub-gms/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ readinessProbe:
#This section is useful if we are installing this chart separately for testing
# helm install datahub-gms datahub-gms/
global:
datahub_analytics_enabled: true

elasticsearch:
host: "elasticsearch"
port: "9200"
Expand Down
3 changes: 3 additions & 0 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ NEO4J_PASSWORD=datahub
MAE_CONSUMER_ENABLED=true
MCE_CONSUMER_ENABLED=true

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

# Uncomment to configure kafka topic names
# Make sure these names are consistent across the whole deployment
# METADATA_AUDIT_EVENT_NAME=MetadataAuditEvent_v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,29 @@ public Map<Urn, List<RecordTemplate>> getLatestAspects(@Nonnull final Set<Urn> u
final Set<EbeanAspectV2.PrimaryKey> dbKeys = urns.stream()
.map(urn -> {
final Set<String> aspectsToFetch = aspectNames.isEmpty()
? getEntityAspectNames(urn)
: aspectNames;
? getEntityAspectNames(urn)
: aspectNames;
return aspectsToFetch.stream()
.map(aspectName -> new EbeanAspectV2.PrimaryKey(urn.toString(), aspectName, LATEST_ASPECT_VERSION))
.collect(Collectors.toList());
.map(aspectName -> new EbeanAspectV2.PrimaryKey(urn.toString(), aspectName, LATEST_ASPECT_VERSION))
.collect(Collectors.toList());
})
.flatMap(List::stream)
.collect(Collectors.toSet());

// Fetch from db and populate urn -> aspect map.
final Map<Urn, List<RecordTemplate>> urnToAspects = new HashMap<>();

// Each urn should have some result, regardless of whether aspects are found in the DB.
for (Urn urn: urns) {
urnToAspects.putIfAbsent(urn, new ArrayList<>());
}

// Add "key" aspects for each urn. TODO: Replace this with a materialized key aspect.
urnToAspects.keySet().forEach(key -> {
final RecordTemplate keyAspect = buildKeyAspect(key);
urnToAspects.get(key).add(keyAspect);
});

_entityDao.batchGet(dbKeys).forEach((key, aspectEntry) -> {
final Urn urn = toUrn(key.getUrn());
final String aspectName = key.getAspect();
Expand All @@ -74,12 +86,6 @@ public Map<Urn, List<RecordTemplate>> getLatestAspects(@Nonnull final Set<Urn> u
urnToAspects.get(urn).add(aspectRecord);
});

// Add "key" aspects to any non null keys.
urnToAspects.keySet().forEach(key -> {
final RecordTemplate keyAspect = buildKeyAspect(key);
urnToAspects.get(key).add(keyAspect);
});

return urnToAspects;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.kafka;

import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.kafka.config.DataHubUsageEventsProcessorCondition;
import com.linkedin.metadata.kafka.elasticsearch.ElasticsearchConnector;
import com.linkedin.metadata.kafka.elasticsearch.JsonElasticEvent;
import com.linkedin.metadata.kafka.transformer.DataHubUsageEventTransformer;
Expand All @@ -11,6 +12,7 @@
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
Expand All @@ -19,6 +21,7 @@
@Slf4j
@Component
@EnableKafka
@Conditional(DataHubUsageEventsProcessorCondition.class)
public class DataHubUsageEventsProcessor {

private final ElasticsearchConnector elasticSearchConnector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.metadata.extractor.FieldExtractor;
import com.linkedin.metadata.graph.Edge;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.kafka.config.MetadataAuditEventsProcessorCondition;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.RelationshipFieldSpec;
import com.linkedin.metadata.models.registry.SnapshotEntityRegistry;
Expand All @@ -34,6 +35,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
Expand All @@ -44,6 +46,7 @@

@Slf4j
@Component
@Conditional(MetadataAuditEventsProcessorCondition.class)
@Import({GraphServiceFactory.class, SearchServiceFactory.class})
@EnableKafka
public class MetadataAuditEventsProcessor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.metadata.kafka.config;

import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;


public class DataHubUsageEventsProcessorCondition implements Condition {
@Override
public boolean matches(
ConditionContext context,
AnnotatedTypeMetadata metadata) {
Environment env = context.getEnvironment();
return "true".equals(env.getProperty("MAE_CONSUMER_ENABLED")) && (
env.getProperty("DATAHUB_ANALYTICS_ENABLED") == null ||
"true".equals(env.getProperty("DATAHUB_ANALYTICS_ENABLED")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public boolean matches(
ConditionContext context,
AnnotatedTypeMetadata metadata) {
Environment env = context.getEnvironment();
return env != null
&& "true".equals(env.getProperty("MAE_CONSUMER_ENABLED"));
return "true".equals(env.getProperty("MAE_CONSUMER_ENABLED"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

@Slf4j
@Component
@EnableKafka
@Conditional(MetadataChangeEventsProcessorCondition.class)
@EnableKafka
public class MetadataChangeEventsProcessor {

private EntityClient entityClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public boolean matches(
ConditionContext context,
AnnotatedTypeMetadata metadata) {
Environment env = context.getEnvironment();
return env != null
&& "true".equals(env.getProperty("MCE_CONSUMER_ENABLED"));
return "true".equals(env.getProperty("MCE_CONSUMER_ENABLED"));
}
}