Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ private static Map<String, Object> getMappingsForField(@Nonnull final Searchable
// Add keyword subfield with fielddata set to true for aggregation queries
subFields.put("keyword", ImmutableMap.of("type", "text", "analyzer", "urn_component", "fielddata", true));
}
mappingForField.put("fields", subFields);
if (!subFields.isEmpty()) {
mappingForField.put("fields", subFields);
}
} else if (fieldType == FieldType.BOOLEAN) {
mappingForField.put("type", "boolean");
} else if (fieldType == FieldType.COUNT) {
Expand All @@ -90,5 +92,4 @@ private static Map<String, Object> getMappingsForField(@Nonnull final Searchable

return mappings;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -52,8 +53,7 @@ public void testMappingsBuilder() {
Map<String, Object> foreignKey = (Map<String, Object>) properties.get("foreignKey");
assertEquals(foreignKey.get("type"), "text");
assertEquals(foreignKey.get("analyzer"), "urn_component");
Map<String, Object> foreignKeySubfields = (Map<String, Object>) foreignKey.get("fields");
assertTrue(foreignKeySubfields.isEmpty());
assertFalse(foreignKey.containsKey("fields"));

// URN_PARTIAL
Map<String, Object> nestedForeignKey = (Map<String, Object>) properties.get("nestedForeignKey");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.kafka.config;

import com.linkedin.metadata.kafka.hydrator.HydratorFactory;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.kafka.hydrator.EntityHydrator;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.restli.client.Client;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -9,7 +10,7 @@


@Configuration
public class HydratorFactoryConfig {
public class EntityHydratorConfig {
@Value("${GMS_HOST:localhost}")
private String gmsHost;
@Value("${GMS_PORT:8080}")
Expand All @@ -20,8 +21,9 @@ public class HydratorFactoryConfig {
private String gmsSslProtocol;

@Bean
public HydratorFactory getHydratorFactory() {
public EntityHydrator getEntityHydrator() {
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
return new HydratorFactory(restClient);
EntityClient entityClient = new EntityClient(restClient);
return new EntityHydrator(entityClient);
}
}
1 change: 1 addition & 0 deletions metadata-jobs/mae-consumer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
compile (project(':gms:factories')) {
exclude group: 'org.neo4j.test'
}
compile project(':gms:client')
compile project(':metadata-utils')
compile project(":entity-registry")
compile project(':metadata-builders')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.metadata.kafka.config;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.kafka.hydrator.EntityHydrator;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.restli.client.Client;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class EntityHydratorConfig {
@Value("${GMS_HOST:localhost}")
private String gmsHost;
@Value("${GMS_PORT:8080}")
private int gmsPort;
@Value("${GMS_USE_SSL:false}")
private boolean gmsUseSSL;
@Value("${GMS_SSL_PROTOCOL:#{null}}")
private String gmsSslProtocol;

@Bean
public EntityHydrator getEntityHydrator() {
Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
EntityClient entityClient = new EntityClient(restClient);
return new EntityHydrator(entityClient);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.metadata.kafka.hydrator;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.data.template.RecordTemplate;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public abstract class BaseHydrator<SNAPSHOT extends RecordTemplate> {

/**
* Use values in the snapshot to hydrate the document
*/
protected abstract void hydrateFromSnapshot(ObjectNode document, SNAPSHOT snapshot);

}
Original file line number Diff line number Diff line change
@@ -1,46 +1,25 @@
package com.linkedin.metadata.kafka.hydrator;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.chart.ChartInfo;
import com.linkedin.common.urn.ChartUrn;
import com.linkedin.metadata.aspect.ChartAspect;
import com.linkedin.metadata.dao.RestliRemoteDAO;
import com.linkedin.metadata.snapshot.ChartSnapshot;
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class ChartHydrator implements Hydrator {
private final Client _restliClient;
private final RestliRemoteDAO<ChartSnapshot, ChartAspect, ChartUrn> _remoteDAO;
public class ChartHydrator extends BaseHydrator<ChartSnapshot> {

private static final String DASHBOARD_TOOL = "dashboardTool";
private static final String TITLE = "title";

public ChartHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(ChartSnapshot.class, ChartAspect.class, _restliClient);
}

@Override
public Optional<ObjectNode> getHydratedEntity(String urn) {
ChartUrn chartUrn;
try {
chartUrn = ChartUrn.createFromString(urn);
} catch (URISyntaxException e) {
log.info("Invalid Chart URN: {}", urn);
return Optional.empty();
protected void hydrateFromSnapshot(ObjectNode document, ChartSnapshot snapshot) {
for (ChartAspect aspect : snapshot.getAspects()) {
if (aspect.isChartInfo()) {
document.put(TITLE, aspect.getChartInfo().getTitle());
} else if (aspect.isChartKey()) {
document.put(DASHBOARD_TOOL, aspect.getChartKey().getDashboardTool());
}
}

ObjectNode jsonObject = HydratorFactory.OBJECT_MAPPER.createObjectNode();
jsonObject.put(DASHBOARD_TOOL, chartUrn.getDashboardToolEntity());

_remoteDAO.get(ChartInfo.class, chartUrn).ifPresent(chartInfo -> jsonObject.put(TITLE, chartInfo.getTitle()));

return Optional.of(jsonObject);
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,25 @@
package com.linkedin.metadata.kafka.hydrator;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.aspect.CorpUserAspect;
import com.linkedin.metadata.dao.RestliRemoteDAO;
import com.linkedin.metadata.snapshot.CorpUserSnapshot;
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class CorpUserHydrator implements Hydrator {
private final Client _restliClient;
private final RestliRemoteDAO<CorpUserSnapshot, CorpUserAspect, CorpuserUrn> _remoteDAO;
public class CorpUserHydrator extends BaseHydrator<CorpUserSnapshot> {

private static final String USER_NAME = "username";
private static final String NAME = "name";

public CorpUserHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(CorpUserSnapshot.class, CorpUserAspect.class, _restliClient);
}

@Override
public Optional<ObjectNode> getHydratedEntity(String urn) {
CorpuserUrn corpuserUrn;
try {
corpuserUrn = CorpuserUrn.createFromString(urn);
} catch (URISyntaxException e) {
log.info("Invalid CorpUser URN: {}", urn);
return Optional.empty();
protected void hydrateFromSnapshot(ObjectNode document, CorpUserSnapshot snapshot) {
for (CorpUserAspect aspect : snapshot.getAspects()) {
if (aspect.isCorpUserInfo()) {
document.put(NAME, aspect.getCorpUserInfo().getDisplayName());
} else if (aspect.isCorpUserKey()) {
document.put(USER_NAME, aspect.getCorpUserKey().getUsername());
}
}

ObjectNode jsonObject = HydratorFactory.OBJECT_MAPPER.createObjectNode();
jsonObject.put(USER_NAME, corpuserUrn.getUsernameEntity());

_remoteDAO.get(CorpUserInfo.class, corpuserUrn)
.ifPresent(corpUserInfo -> jsonObject.put(NAME, corpUserInfo.getDisplayName()));

return Optional.of(jsonObject);
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,24 @@
package com.linkedin.metadata.kafka.hydrator;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.DashboardUrn;
import com.linkedin.dashboard.DashboardInfo;
import com.linkedin.metadata.aspect.DashboardAspect;
import com.linkedin.metadata.dao.RestliRemoteDAO;
import com.linkedin.metadata.snapshot.DashboardSnapshot;
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class DashboardHydrator implements Hydrator {
private final Client _restliClient;
private final RestliRemoteDAO<DashboardSnapshot, DashboardAspect, DashboardUrn> _remoteDAO;

public class DashboardHydrator extends BaseHydrator<DashboardSnapshot> {
private static final String DASHBOARD_TOOL = "dashboardTool";
private static final String TITLE = "title";

public DashboardHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(DashboardSnapshot.class, DashboardAspect.class, _restliClient);
}

@Override
public Optional<ObjectNode> getHydratedEntity(String urn) {
DashboardUrn dashboardUrn;
try {
dashboardUrn = DashboardUrn.createFromString(urn);
} catch (URISyntaxException e) {
log.info("Invalid Dashboard URN: {}", urn);
return Optional.empty();
protected void hydrateFromSnapshot(ObjectNode document, DashboardSnapshot snapshot) {
for (DashboardAspect aspect : snapshot.getAspects()) {
if (aspect.isDashboardInfo()) {
document.put(TITLE, aspect.getDashboardInfo().getTitle());
} else if (aspect.isDashboardKey()) {
document.put(DASHBOARD_TOOL, aspect.getDashboardKey().getDashboardTool());
}
}

ObjectNode jsonObject = HydratorFactory.OBJECT_MAPPER.createObjectNode();
jsonObject.put(DASHBOARD_TOOL, dashboardUrn.getDashboardToolEntity());

_remoteDAO.get(DashboardInfo.class, dashboardUrn)
.ifPresent(dashboardInfo -> jsonObject.put(TITLE, dashboardInfo.getTitle()));

return Optional.of(jsonObject);
}
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,25 @@
package com.linkedin.metadata.kafka.hydrator;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.urn.DataFlowUrn;
import com.linkedin.datajob.DataFlowInfo;
import com.linkedin.metadata.aspect.DataFlowAspect;
import com.linkedin.metadata.dao.RestliRemoteDAO;
import com.linkedin.metadata.snapshot.DataFlowSnapshot;
import com.linkedin.restli.client.Client;
import java.net.URISyntaxException;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class DataFlowHydrator implements Hydrator {
private final Client _restliClient;
private final RestliRemoteDAO<DataFlowSnapshot, DataFlowAspect, DataFlowUrn> _remoteDAO;
public class DataFlowHydrator extends BaseHydrator<DataFlowSnapshot> {

private static final String ORCHESTRATOR = "orchestrator";
private static final String NAME = "name";

public DataFlowHydrator(@Nonnull Client restliClient) {
_restliClient = restliClient;
_remoteDAO = new RestliRemoteDAO<>(DataFlowSnapshot.class, DataFlowAspect.class, _restliClient);
}

@Override
public Optional<ObjectNode> getHydratedEntity(String urn) {
DataFlowUrn dataFlowUrn;
try {
dataFlowUrn = DataFlowUrn.createFromString(urn);
} catch (URISyntaxException e) {
log.info("Invalid DataFlow URN: {}", urn);
return Optional.empty();
protected void hydrateFromSnapshot(ObjectNode document, DataFlowSnapshot snapshot) {
for (DataFlowAspect aspect : snapshot.getAspects()) {
if (aspect.isDataFlowInfo()) {
document.put(NAME, aspect.getDataFlowInfo().getName());
} else if (aspect.isDataFlowKey()) {
document.put(ORCHESTRATOR, aspect.getDataFlowKey().getOrchestrator());
}
}

ObjectNode jsonObject = HydratorFactory.OBJECT_MAPPER.createObjectNode();
jsonObject.put(ORCHESTRATOR, dataFlowUrn.getOrchestratorEntity());

_remoteDAO.get(DataFlowInfo.class, dataFlowUrn).ifPresent(dataFlowInfo -> jsonObject.put(NAME, dataFlowInfo.getName()));

return Optional.of(jsonObject);
}
}
Loading