diff --git a/build.gradle b/build.gradle index 0bb918d9028054..8b0c2082eeb5ed 100644 --- a/build.gradle +++ b/build.gradle @@ -62,6 +62,7 @@ project.ext.externalDependency = [ 'guice': 'com.google.inject:guice:4.2.2', 'guava': 'com.google.guava:guava:27.0.1-jre', 'h2': 'com.h2database:h2:1.4.196', + 'hadoopClient': 'org.apache.hadoop:hadoop-client:3.1.0', 'hibernateCore': 'org.hibernate:hibernate-core:5.2.16.Final', 'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9', 'iStackCommons': 'com.sun.istack:istack-commons-runtime:4.0.1', @@ -87,6 +88,7 @@ project.ext.externalDependency = [ 'neo4jHarness': 'org.neo4j.test:neo4j-harness:3.4.11', 'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:4.0.1', 'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test', + 'parquet': 'org.apache.parquet:parquet-avro:1.12.0', 'picocli': 'info.picocli:picocli:4.5.0', 'playCache': 'com.typesafe.play:play-cache_2.11:2.6.18', 'playDocs': 'com.typesafe.play:play-docs_2.11:2.6.18', diff --git a/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-cleanup-job-template.yml b/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-cleanup-job-template.yml index 298e374fd8d293..7969bad8fb4a26 100644 --- a/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-cleanup-job-template.yml +++ b/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-cleanup-job-template.yml @@ -85,6 +85,9 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} + - name: GRAPH_SERVICE_IMPL + value: {{ .Values.global.graph_service_impl }} + {{- if eq .Values.global.graph_service_impl "neo4j" }} - name: NEO4J_HOST value: "{{ .Values.global.neo4j.host }}" - name: NEO4J_URI @@ -96,6 +99,7 @@ spec: secretKeyRef: name: "{{ .Values.global.neo4j.password.secretRef }}" key: "{{ .Values.global.neo4j.password.secretKey }}" + {{- end }} {{- with .Values.datahubUpgrade.extraEnvs }} {{- toYaml . | nindent 16 }} {{- end }} diff --git a/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-restore-indices-job-template.yml b/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-restore-indices-job-template.yml new file mode 100644 index 00000000000000..a98128677cb777 --- /dev/null +++ b/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-restore-indices-job-template.yml @@ -0,0 +1,119 @@ +{{- if .Values.datahubUpgrade.enabled -}} +# Job template for restoring indices by sending MAE corresponding to all entities in the local db +# Creates a suspended cronJob that you can use to create an adhoc job when ready to run clean up. +# Run the following command to do so +# kubectl create job --from=cronjob/<>-datahub-restore-indices-job-template datahub-restore-indices-job +apiVersion: batch/v1beta1 +kind: CronJob +metadata: + name: {{ .Release.Name }}-datahub-restore-indices-job-template + labels: + app.kubernetes.io/managed-by: {{ .Release.Service | quote }} + app.kubernetes.io/instance: {{ .Release.Name | quote }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + helm.sh/chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" +spec: + schedule: "* * * * *" + suspend: true + jobTemplate: + spec: + template: + spec: + {{- with .Values.global.hostAliases }} + hostAliases: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- with .Values.datahubUpgrade.serviceAccount }} + serviceAccountName: {{ . }} + {{- end }} + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 12 }} + {{- end }} + volumes: + {{- with .Values.datahubUpgrade.extraVolumes }} + {{- toYaml . | nindent 12 }} + {{- end }} + restartPolicy: Never + securityContext: + runAsUser: 1000 + fsGroup: 1000 + containers: + - name: datahub-upgrade-job + image: "{{ .Values.datahubUpgrade.image.repository }}:{{ .Values.datahubUpgrade.image.tag }}" + args: + - "-u" + - "RestoreIndices" + env: + - name: DATAHUB_GMS_HOST + value: {{ printf "%s-%s" .Release.Name "datahub-gms" }} + - name: DATAHUB_GMS_PORT + value: "{{ .Values.global.datahub.gms.port }}" + - name: DATAHUB_MAE_CONSUMER_HOST + value: {{ printf "%s-%s" .Release.Name "datahub-mae-consumer" }} + - name: DATAHUB_MAE_CONSUMER_PORT + value: "{{ .Values.global.datahub.mae_consumer.port }}" + - name: EBEAN_DATASOURCE_USERNAME + value: "{{ .Values.global.sql.datasource.username }}" + - name: EBEAN_DATASOURCE_PASSWORD + valueFrom: + secretKeyRef: + name: "{{ .Values.global.sql.datasource.password.secretRef }}" + key: "{{ .Values.global.sql.datasource.password.secretKey }}" + - name: EBEAN_DATASOURCE_HOST + value: "{{ .Values.global.sql.datasource.host }}" + - name: EBEAN_DATASOURCE_URL + value: "{{ .Values.global.sql.datasource.url }}" + - name: EBEAN_DATASOURCE_DRIVER + value: "{{ .Values.global.sql.datasource.driver }}" + - name: KAFKA_BOOTSTRAP_SERVER + value: "{{ .Values.global.kafka.bootstrap.server }}" + - name: KAFKA_SCHEMAREGISTRY_URL + value: "{{ .Values.global.kafka.schemaregistry.url }}" + - name: ELASTICSEARCH_HOST + value: {{ .Values.global.elasticsearch.host | quote }} + - name: ELASTICSEARCH_PORT + value: {{ .Values.global.elasticsearch.port | quote }} + {{- with .Values.global.elasticsearch.useSSL }} + - name: ELASTICSEARCH_USE_SSL + value: {{ . | quote }} + {{- end }} + {{- with .Values.global.elasticsearch.auth }} + - name: ELASTICSEARCH_USERNAME + value: {{ .username }} + - name: ELASTICSEARCH_PASSWORD + valueFrom: + secretKeyRef: + name: "{{ .password.secretRef }}" + key: "{{ .password.secretKey }}" + {{- end }} + - name: GRAPH_SERVICE_IMPL + value: {{ .Values.global.graph_service_impl }} + {{- if eq .Values.global.graph_service_impl "neo4j" }} + - name: NEO4J_HOST + value: "{{ .Values.global.neo4j.host }}" + - name: NEO4J_URI + value: "{{ .Values.global.neo4j.uri }}" + - name: NEO4J_USERNAME + value: "{{ .Values.global.neo4j.username }}" + - name: NEO4J_PASSWORD + valueFrom: + secretKeyRef: + name: "{{ .Values.global.neo4j.password.secretRef }}" + key: "{{ .Values.global.neo4j.password.secretKey }}" + {{- end }} + {{- with .Values.datahubUpgrade.extraEnvs }} + {{- toYaml . | nindent 16 }} + {{- end }} + volumeMounts: + {{- with .Values.datahubUpgrade.extraVolumeMounts }} + {{- toYaml . | nindent 16 }} + {{- end }} + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 300m + memory: 256Mi +{{- end -}} \ No newline at end of file diff --git a/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-upgrade-job.yml b/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-upgrade-job.yml index 2eecdbfcd9aed8..ee03347426d657 100644 --- a/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-upgrade-job.yml +++ b/datahub-kubernetes/datahub/templates/datahub-upgrade/datahub-upgrade-job.yml @@ -91,6 +91,9 @@ spec: name: "{{ .password.secretRef }}" key: "{{ .password.secretKey }}" {{- end }} + - name: GRAPH_SERVICE_IMPL + value: {{ .Values.global.graph_service_impl }} + {{- if eq .Values.global.graph_service_impl "neo4j" }} - name: NEO4J_HOST value: "{{ .Values.global.neo4j.host }}" - name: NEO4J_URI @@ -102,6 +105,7 @@ spec: secretKeyRef: name: "{{ .Values.global.neo4j.password.secretRef }}" key: "{{ .Values.global.neo4j.password.secretKey }}" + {{- end }} {{- with .Values.datahubUpgrade.extraEnvs }} {{- toYaml . | nindent 12 }} {{- end }} diff --git a/datahub-upgrade/build.gradle b/datahub-upgrade/build.gradle index 51c7ca782febe4..fbcf6e58a260b0 100644 --- a/datahub-upgrade/build.gradle +++ b/datahub-upgrade/build.gradle @@ -7,10 +7,13 @@ dependencies { compile project(':metadata-io') compile project(':gms:impl') compile project(':gms:factories') + compile project(':gms:client') compile externalDependency.javaxInject + compile externalDependency.hadoopClient compile externalDependency.lombok compile externalDependency.picocli + compile externalDependency.parquet compile externalDependency.springBeans compile externalDependency.springBootAutoconfigure compile externalDependency.springCore diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCli.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCli.java index d0c3c67425d596..e7c730d6e6d249 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCli.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCli.java @@ -2,8 +2,10 @@ import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager; import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade; -import java.util.List; import com.linkedin.datahub.upgrade.nocodecleanup.NoCodeCleanupUpgrade; +import com.linkedin.datahub.upgrade.restorebackup.RestoreBackup; +import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices; +import java.util.List; import javax.inject.Inject; import javax.inject.Named; import lombok.extern.slf4j.Slf4j; @@ -34,10 +36,20 @@ private static final class Args { @Named("noCodeCleanup") private NoCodeCleanupUpgrade noCodeCleanup; + @Inject + @Named("restoreIndices") + private RestoreIndices restoreIndices; + + @Inject + @Named("restoreBackup") + private RestoreBackup restoreBackup; + @Override public void run(String... cmdLineArgs) { _upgradeManager.register(noCodeUpgrade); _upgradeManager.register(noCodeCleanup); + _upgradeManager.register(restoreIndices); + _upgradeManager.register(restoreBackup); final Args args = new Args(); new CommandLine(args).setCaseInsensitiveEnumValuesAllowed(true).parseArgs(cmdLineArgs); diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java index aeff08ab8a6f2d..b6c30d2c064ecb 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java @@ -7,7 +7,7 @@ @SuppressWarnings("checkstyle:HideUtilityClassConstructor") @SpringBootApplication(exclude = {RestClientAutoConfiguration.class}, scanBasePackages = { - "com.linkedin.gms.factory.common", "com.linkedin.datahub.upgrade.config", "com.linkedin.gms.factory.entity"}) + "com.linkedin.gms.factory.common", "com.linkedin.gms.factory.search", "com.linkedin.datahub.upgrade.config", "com.linkedin.gms.factory.entity"}) public class UpgradeCliApplication { public static void main(String[] args) { new SpringApplicationBuilder(UpgradeCliApplication.class, UpgradeCli.class).web(WebApplicationType.NONE).run(args); diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeStep.java index 66811abb91d8a8..b85bd7a51e3dd5 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeStep.java @@ -2,6 +2,7 @@ import java.util.function.Function; + /** * Represents a single executable step in an {@link Upgrade}. */ @@ -31,4 +32,10 @@ default boolean isOptional() { return false; } + /** + * Returns whether or not to skip the step based on the UpgradeContext + */ + default boolean skip(UpgradeContext context) { + return false; + } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/ClearGraphServiceStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/ClearGraphServiceStep.java new file mode 100644 index 00000000000000..49ae629f78cbad --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/ClearGraphServiceStep.java @@ -0,0 +1,58 @@ +package com.linkedin.datahub.upgrade.common.steps; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade; +import com.linkedin.metadata.graph.GraphService; +import java.util.function.Function; + + +public class ClearGraphServiceStep implements UpgradeStep { + + private final String deletePattern = ".*"; + + private final GraphService _graphService; + private final boolean _alwaysRun; + + public ClearGraphServiceStep(final GraphService graphService, final boolean alwaysRun) { + _graphService = graphService; + _alwaysRun = alwaysRun; + } + + @Override + public String id() { + return "ClearGraphServiceStep"; + } + + @Override + public boolean skip(UpgradeContext context) { + if (_alwaysRun) { + return false; + } + if (context.parsedArgs().containsKey(NoCodeUpgrade.CLEAN_ARG_NAME)) { + return false; + } + context.report().addLine("Cleanup has not been requested."); + return true; + } + + @Override + public int retryCount() { + return 1; + } + + @Override + public Function executable() { + return (context) -> { + try { + _graphService.clear(); + } catch (Exception e) { + context.report().addLine(String.format("Failed to clear graph indices: %s", e.toString())); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/ClearSearchServiceStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/ClearSearchServiceStep.java new file mode 100644 index 00000000000000..0d5024e60b432b --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/ClearSearchServiceStep.java @@ -0,0 +1,56 @@ +package com.linkedin.datahub.upgrade.common.steps; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade; +import com.linkedin.metadata.search.SearchService; +import java.util.function.Function; + + +public class ClearSearchServiceStep implements UpgradeStep { + + private final SearchService _searchService; + private final boolean _alwaysRun; + + public ClearSearchServiceStep(final SearchService searchService, final boolean alwaysRun) { + _searchService = searchService; + _alwaysRun = alwaysRun; + } + + @Override + public String id() { + return "ClearSearchServiceStep"; + } + + @Override + public boolean skip(UpgradeContext context) { + if (_alwaysRun) { + return false; + } + if (context.parsedArgs().containsKey(NoCodeUpgrade.CLEAN_ARG_NAME)) { + return false; + } + context.report().addLine("Cleanup has not been requested."); + return true; + } + + @Override + public int retryCount() { + return 1; + } + + @Override + public Function executable() { + return (context) -> { + try { + _searchService.clear(); + } catch (Exception e) { + context.report().addLine(String.format("Failed to clear search service: %s", e.toString())); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSDisableWriteModeStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSDisableWriteModeStep.java new file mode 100644 index 00000000000000..c43bda480c4ea3 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSDisableWriteModeStep.java @@ -0,0 +1,39 @@ +package com.linkedin.datahub.upgrade.common.steps; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.entity.client.EntityClient; +import java.util.function.Function; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class GMSDisableWriteModeStep implements UpgradeStep { + private final EntityClient _entityClient; + + @Override + public String id() { + return "GMSDisableWriteModeStep"; + } + + @Override + public int retryCount() { + return 2; + } + + @Override + public Function executable() { + return (context) -> { + try { + _entityClient.setWritable(false); + } catch (Exception e) { + e.printStackTrace(); + context.report().addLine("Failed to turn write mode off in GMS"); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSEnableWriteModeStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSEnableWriteModeStep.java new file mode 100644 index 00000000000000..db5d229d57b18d --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSEnableWriteModeStep.java @@ -0,0 +1,40 @@ +package com.linkedin.datahub.upgrade.common.steps; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.entity.client.EntityClient; +import java.util.function.Function; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class GMSEnableWriteModeStep implements UpgradeStep { + + private final EntityClient _entityClient; + + @Override + public String id() { + return "GMSEnableWriteModeStep"; + } + + @Override + public int retryCount() { + return 2; + } + + @Override + public Function executable() { + return (context) -> { + try { + _entityClient.setWritable(true); + } catch (Exception e) { + e.printStackTrace(); + context.report().addLine("Failed to turn write mode back on in GMS"); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/GMSQualificationStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSQualificationStep.java similarity index 97% rename from datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/GMSQualificationStep.java rename to datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSQualificationStep.java index 8ef5006c12efdc..75336a9b1f6da9 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/GMSQualificationStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/common/steps/GMSQualificationStep.java @@ -1,4 +1,4 @@ -package com.linkedin.datahub.upgrade.nocode; +package com.linkedin.datahub.upgrade.common.steps; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -39,7 +39,7 @@ private static String convertStreamToString(InputStream is) { return sb.toString(); } - GMSQualificationStep() { } + public GMSQualificationStep() { } @Override public String id() { diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/EntityClientFactory.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/EntityClientFactory.java new file mode 100644 index 00000000000000..bfccfca091cc33 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/EntityClientFactory.java @@ -0,0 +1,27 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.restli.DefaultRestliClientFactory; +import com.linkedin.restli.client.Client; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +public class EntityClientFactory { + @Value("${DATAHUB_GMS_HOST:localhost}") + private String gmsHost; + @Value("${DATAHUB_GMS_PORT:8080}") + private int gmsPort; + @Value("${DATAHUB_GMS_USE_SSL:false}") + private boolean gmsUseSSL; + @Value("${DATAHUB_GMS_SSL_PROTOCOL:#{null}}") + private String gmsSslProtocol; + + @Bean("entityClient") + public EntityClient getEntityClient() { + Client restClient = DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol); + return new EntityClient(restClient); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeCleanupConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeCleanupConfig.java index c53353cbef61e4..cd962b75df22d1 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeCleanupConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeCleanupConfig.java @@ -2,6 +2,7 @@ import com.linkedin.datahub.upgrade.nocodecleanup.NoCodeCleanupUpgrade; import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import io.ebean.EbeanServerFactory; import io.ebean.config.ServerConfig; import javax.annotation.Nonnull; @@ -12,6 +13,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; +import static com.linkedin.gms.factory.common.IndexConventionFactory.INDEX_CONVENTION_BEAN; import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.EBEAN_MODEL_PACKAGE; @@ -22,15 +24,16 @@ public class NoCodeCleanupConfig { ApplicationContext applicationContext; @Bean(name = "noCodeCleanup") - @DependsOn({"gmsEbeanServiceConfig", "graphService", "elasticSearchRestHighLevelClient"}) + @DependsOn({"gmsEbeanServiceConfig", "graphService", "elasticSearchRestHighLevelClient", INDEX_CONVENTION_BEAN}) @Nonnull public NoCodeCleanupUpgrade createInstance() { final ServerConfig serverConfig = applicationContext.getBean(ServerConfig.class); final GraphService graphClient = applicationContext.getBean(GraphService.class); final RestHighLevelClient searchClient = applicationContext.getBean(RestHighLevelClient.class); + final IndexConvention indexConvention = applicationContext.getBean(IndexConvention.class); if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) { serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE); } - return new NoCodeCleanupUpgrade(EbeanServerFactory.create(serverConfig), graphClient, searchClient); + return new NoCodeCleanupUpgrade(EbeanServerFactory.create(serverConfig), graphClient, searchClient, indexConvention); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeUpgradeConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeUpgradeConfig.java index 0b1871f46f59ca..0f6d28f199937a 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeUpgradeConfig.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/NoCodeUpgradeConfig.java @@ -1,6 +1,7 @@ package com.linkedin.datahub.upgrade.config; import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade; +import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import io.ebean.EbeanServerFactory; @@ -12,7 +13,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; -import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.*; +import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.EBEAN_MODEL_PACKAGE; @Configuration @@ -22,17 +23,18 @@ public class NoCodeUpgradeConfig { ApplicationContext applicationContext; @Bean(name = "noCodeUpgrade") - @DependsOn({"gmsEbeanServiceConfig", "entityService"}) + @DependsOn({"gmsEbeanServiceConfig", "entityService", "entityClient"}) @Nonnull public NoCodeUpgrade createInstance() { final ServerConfig serverConfig = applicationContext.getBean(ServerConfig.class); final EntityService entityService = applicationContext.getBean(EntityService.class); + final EntityClient entityClient = applicationContext.getBean(EntityClient.class); final SnapshotEntityRegistry entityRegistry = new SnapshotEntityRegistry(); if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) { serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE); } - return new NoCodeUpgrade(EbeanServerFactory.create(serverConfig), entityService, entityRegistry); + return new NoCodeUpgrade(EbeanServerFactory.create(serverConfig), entityService, entityRegistry, entityClient); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/RestoreBackupConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/RestoreBackupConfig.java new file mode 100644 index 00000000000000..b8297556abb6de --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/RestoreBackupConfig.java @@ -0,0 +1,43 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.restorebackup.RestoreBackup; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; +import com.linkedin.metadata.search.SearchService; +import io.ebean.EbeanServerFactory; +import io.ebean.config.ServerConfig; +import javax.annotation.Nonnull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.EBEAN_MODEL_PACKAGE; + + +@Configuration +public class RestoreBackupConfig { + @Autowired + ApplicationContext applicationContext; + + @Bean(name = "restoreBackup") + @DependsOn({"gmsEbeanServiceConfig", "entityService", "entityClient", "graphService", "searchService"}) + @Nonnull + public RestoreBackup createInstance() { + final ServerConfig serverConfig = applicationContext.getBean(ServerConfig.class); + final EntityService entityService = applicationContext.getBean(EntityService.class); + final EntityClient entityClient = applicationContext.getBean(EntityClient.class); + final GraphService graphClient = applicationContext.getBean(GraphService.class); + final SearchService searchClient = applicationContext.getBean(SearchService.class); + + if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) { + serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE); + } + + return new RestoreBackup(EbeanServerFactory.create(serverConfig), entityService, + SnapshotEntityRegistry.getInstance(), entityClient, graphClient, searchClient); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/RestoreIndicesConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/RestoreIndicesConfig.java new file mode 100644 index 00000000000000..a97f321d397b9d --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/RestoreIndicesConfig.java @@ -0,0 +1,41 @@ +package com.linkedin.datahub.upgrade.config; + +import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; +import com.linkedin.metadata.search.SearchService; +import io.ebean.EbeanServerFactory; +import io.ebean.config.ServerConfig; +import javax.annotation.Nonnull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.DependsOn; + +import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.EBEAN_MODEL_PACKAGE; + + +@Configuration +public class RestoreIndicesConfig { + @Autowired + ApplicationContext applicationContext; + + @Bean(name = "restoreIndices") + @DependsOn({"gmsEbeanServiceConfig", "entityService", "searchService", "graphService"}) + @Nonnull + public RestoreIndices createInstance() { + final ServerConfig serverConfig = applicationContext.getBean(ServerConfig.class); + final EntityService entityService = applicationContext.getBean(EntityService.class); + final SearchService searchService = applicationContext.getBean(SearchService.class); + final GraphService graphService = applicationContext.getBean(GraphService.class); + + if (!serverConfig.getPackages().contains(EBEAN_MODEL_PACKAGE)) { + serverConfig.getPackages().add(EBEAN_MODEL_PACKAGE); + } + + return new RestoreIndices(EbeanServerFactory.create(serverConfig), entityService, + SnapshotEntityRegistry.getInstance(), searchService, graphService); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/impl/DefaultUpgradeManager.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/impl/DefaultUpgradeManager.java index 091fd46f94f447..0f2c6608b41afb 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/impl/DefaultUpgradeManager.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/impl/DefaultUpgradeManager.java @@ -37,7 +37,8 @@ private UpgradeResult executeInternal(Upgrade upgrade, List args) { final UpgradeContext context = new DefaultUpgradeContext(upgrade, upgradeReport, new ArrayList<>(), args); upgradeReport.addLine(String.format("Starting upgrade with id %s...", upgrade.id())); UpgradeResult result = executeInternal(context); - upgradeReport.addLine(String.format("Upgrade %s completed with result %s. Exiting...", upgrade.id(), result.result())); + upgradeReport.addLine( + String.format("Upgrade %s completed with result %s. Exiting...", upgrade.id(), result.result())); executeCleanupInternal(context, result); return result; } @@ -52,30 +53,38 @@ private UpgradeResult executeInternal(UpgradeContext context) { for (int i = 0; i < steps.size(); i++) { final UpgradeStep step = steps.get(i); - upgradeReport.addLine(String.format(String.format("Executing Step %s/%s: %s...", i + 1, steps.size(), step.id()), upgrade.id())); + if (step.skip(context)) { + upgradeReport.addLine( + String.format(String.format("Skipping Step %s/%s: %s...", i + 1, steps.size(), step.id()), upgrade.id())); + continue; + } + + upgradeReport.addLine( + String.format(String.format("Executing Step %s/%s: %s...", i + 1, steps.size(), step.id()), upgrade.id())); final UpgradeStepResult stepResult = executeStepInternal(context, step); stepResults.add(stepResult); // Apply Actions if (UpgradeStepResult.Action.ABORT.equals(stepResult.action())) { - upgradeReport.addLine(String.format("Step with id %s requested an abort of the in-progress update. Aborting the upgrade...", step.id())); + upgradeReport.addLine( + String.format("Step with id %s requested an abort of the in-progress update. Aborting the upgrade...", + step.id())); return new DefaultUpgradeResult(UpgradeResult.Result.ABORTED, upgradeReport); } // Handle Results if (UpgradeStepResult.Result.FAILED.equals(stepResult.result())) { if (step.isOptional()) { - upgradeReport.addLine(String.format("Failed Step %s/%s: %s. Step marked as optional. Proceeding with upgrade...", i + 1, steps.size(), step.id())); + upgradeReport.addLine( + String.format("Failed Step %s/%s: %s. Step marked as optional. Proceeding with upgrade...", i + 1, + steps.size(), step.id())); continue; } // Required step failed. Fail the entire upgrade process. upgradeReport.addLine( - String.format("Failed Step %s/%s: %s. Failed after %s retries.", - i + 1, - steps.size(), - step.id(), + String.format("Failed Step %s/%s: %s. Failed after %s retries.", i + 1, steps.size(), step.id(), step.retryCount())); upgradeReport.addLine(String.format("Exiting upgrade %s with failure.", upgrade.id())); return new DefaultUpgradeResult(UpgradeResult.Result.FAILED, upgradeReport); @@ -98,26 +107,18 @@ private UpgradeStepResult executeStepInternal(UpgradeContext context, UpgradeSte if (result == null) { // Failed to even retrieve a result. Create a default failure result. - result = new DefaultUpgradeStepResult( - step.id(), - UpgradeStepResult.Result.FAILED - ); + result = new DefaultUpgradeStepResult(step.id(), UpgradeStepResult.Result.FAILED); context.report().addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1))); } if (UpgradeStepResult.Result.SUCCEEDED.equals(result.result())) { break; } - } catch (Exception e) { - context.report().addLine(String.format("Caught exception during attempt %s of Step with id %s: %s", - i, - step.id(), - e.toString() - )); - result = new DefaultUpgradeStepResult( - step.id(), - UpgradeStepResult.Result.FAILED); + context.report() + .addLine( + String.format("Caught exception during attempt %s of Step with id %s: %s", i, step.id(), e.toString())); + result = new DefaultUpgradeStepResult(step.id(), UpgradeStepResult.Result.FAILED); context.report().addLine(String.format("Retrying %s more times...", maxAttempts - (i + 1))); } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/GMSEnableWriteModeStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/GMSEnableWriteModeStep.java deleted file mode 100644 index e7db2ca368be34..00000000000000 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/GMSEnableWriteModeStep.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.linkedin.datahub.upgrade.nocode; - -import com.linkedin.datahub.upgrade.UpgradeContext; -import com.linkedin.datahub.upgrade.UpgradeStep; -import com.linkedin.datahub.upgrade.UpgradeStepResult; -import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.function.Function; - - -public class GMSEnableWriteModeStep implements UpgradeStep { - - GMSEnableWriteModeStep() { } - - @Override - public String id() { - return "GMSEnableWriteModeStep"; - } - - @Override - public int retryCount() { - return 2; - } - - @Override - public Function executable() { - return (context) -> { - String gmsHost = System.getenv("DATAHUB_GMS_HOST") == null ? "localhost" : System.getenv("DATAHUB_GMS_HOST"); - String gmsPort = System.getenv("DATAHUB_GMS_PORT") == null ? "8080" : System.getenv("DATAHUB_GMS_PORT"); - try { - String spec = String.format("http://%s:%s/entities?action=setWritable", gmsHost, gmsPort); - - HttpURLConnection gmsConnection = (HttpURLConnection) new URL(spec).openConnection(); - gmsConnection.setRequestMethod("POST"); - gmsConnection.connect(); - - if (gmsConnection.getResponseCode() != 200) { - System.out.printf("Failed to turn write mode back on in GMS after migration, received %i", gmsConnection.getResponseCode()); - } else { - System.out.printf("Re-enabled write mode in GMS"); - } - } catch (Exception e) { - e.printStackTrace(); - System.out.printf("Failed to turn write mode back on in GMS after migration"); - } - return new DefaultUpgradeStepResult( - id(), - UpgradeStepResult.Result.SUCCEEDED); - }; - } -} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java index bbf85536b81aea..793b7f6db75a8b 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/NoCodeUpgrade.java @@ -3,6 +3,9 @@ import com.linkedin.datahub.upgrade.Upgrade; import com.linkedin.datahub.upgrade.UpgradeCleanupStep; import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep; +import com.linkedin.datahub.upgrade.common.steps.GMSQualificationStep; +import com.linkedin.entity.client.EntityClient; import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import io.ebean.EbeanServer; @@ -24,11 +27,13 @@ public class NoCodeUpgrade implements Upgrade { public NoCodeUpgrade( final EbeanServer server, final EntityService entityService, - final SnapshotEntityRegistry entityRegistry) { + final SnapshotEntityRegistry entityRegistry, + final EntityClient entityClient) { _steps = buildUpgradeSteps( server, entityService, - entityRegistry); + entityRegistry, + entityClient); _cleanupSteps = buildCleanupSteps(server); } @@ -54,7 +59,8 @@ private List buildCleanupSteps(final EbeanServer server) { private List buildUpgradeSteps( final EbeanServer server, final EntityService entityService, - final SnapshotEntityRegistry entityRegistry) { + final SnapshotEntityRegistry entityRegistry, + final EntityClient entityClient) { final List steps = new ArrayList<>(); steps.add(new RemoveAspectV2TableStep(server)); steps.add(new GMSQualificationStep()); @@ -62,7 +68,7 @@ private List buildUpgradeSteps( steps.add(new CreateAspectTableStep(server)); steps.add(new IngestDataPlatformsStep(entityService)); steps.add(new DataMigrationStep(server, entityService, entityRegistry)); - steps.add(new GMSEnableWriteModeStep()); + steps.add(new GMSEnableWriteModeStep(entityClient)); return steps; } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/RemoveAspectV2TableStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/RemoveAspectV2TableStep.java index efaecfcda87b68..440884470463d2 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/RemoveAspectV2TableStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocode/RemoveAspectV2TableStep.java @@ -27,11 +27,18 @@ public String id() { @Override public Function executable() { return (context) -> { - if (context.parsedArgs().containsKey(NoCodeUpgrade.CLEAN_ARG_NAME)) { - context.report().addLine("Cleanup requested. Dropping metadata_aspect_v2"); - _server.execute(_server.createSqlUpdate("DROP TABLE IF EXISTS metadata_aspect_v2")); - } + context.report().addLine("Cleanup requested. Dropping metadata_aspect_v2"); + _server.execute(_server.createSqlUpdate("DROP TABLE IF EXISTS metadata_aspect_v2")); return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); }; } + + @Override + public boolean skip(UpgradeContext context) { + if (context.parsedArgs().containsKey(NoCodeUpgrade.CLEAN_ARG_NAME)) { + return false; + } + context.report().addLine("Cleanup has not been requested."); + return true; + } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/DeleteLegacySearchIndicesStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/DeleteLegacySearchIndicesStep.java index 6ce14e28d5e528..fb0212e591f677 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/DeleteLegacySearchIndicesStep.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/DeleteLegacySearchIndicesStep.java @@ -4,21 +4,25 @@ import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.datahub.upgrade.UpgradeStepResult; import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import java.util.function.Function; +import lombok.RequiredArgsConstructor; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; // Do we need SQL-tech specific migration paths? +@RequiredArgsConstructor public class DeleteLegacySearchIndicesStep implements UpgradeStep { - private final String deletePattern = "*document*"; + private final String deletePattern; private final RestHighLevelClient _searchClient; - public DeleteLegacySearchIndicesStep(final RestHighLevelClient searchClient) { + public DeleteLegacySearchIndicesStep(final RestHighLevelClient searchClient, final IndexConvention indexConvention) { _searchClient = searchClient; + deletePattern = indexConvention.getPrefix().map(p -> p + "_").orElse("") + "*document*"; } @Override diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/NoCodeCleanupUpgrade.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/NoCodeCleanupUpgrade.java index 6719af8957ed6f..c9a13c2208a569 100644 --- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/NoCodeCleanupUpgrade.java +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/nocodecleanup/NoCodeCleanupUpgrade.java @@ -4,6 +4,7 @@ import com.linkedin.datahub.upgrade.UpgradeCleanupStep; import com.linkedin.datahub.upgrade.UpgradeStep; import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import io.ebean.EbeanServer; import java.util.ArrayList; import java.util.Collections; @@ -17,14 +18,9 @@ public class NoCodeCleanupUpgrade implements Upgrade { private final List _cleanupSteps; // Upgrade requires the EbeanServer. - public NoCodeCleanupUpgrade( - final EbeanServer server, - final GraphService graphClient, - final RestHighLevelClient searchClient) { - _steps = buildUpgradeSteps( - server, - graphClient, - searchClient); + public NoCodeCleanupUpgrade(final EbeanServer server, final GraphService graphClient, + final RestHighLevelClient searchClient, final IndexConvention indexConvention) { + _steps = buildUpgradeSteps(server, graphClient, searchClient, indexConvention); _cleanupSteps = buildCleanupSteps(); } @@ -47,15 +43,13 @@ private List buildCleanupSteps() { return Collections.emptyList(); } - private List buildUpgradeSteps( - final EbeanServer server, - final GraphService graphClient, - final RestHighLevelClient searchClient) { + private List buildUpgradeSteps(final EbeanServer server, final GraphService graphClient, + final RestHighLevelClient searchClient, final IndexConvention indexConvention) { final List steps = new ArrayList<>(); steps.add(new NoCodeUpgradeQualificationStep(server)); steps.add(new DeleteAspectTableStep(server)); steps.add(new DeleteLegacyGraphRelationshipsStep(graphClient)); - steps.add(new DeleteLegacySearchIndicesStep(searchClient)); + steps.add(new DeleteLegacySearchIndicesStep(searchClient, indexConvention)); return steps; } } diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/ClearAspectV2TableStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/ClearAspectV2TableStep.java new file mode 100644 index 00000000000000..711cccf742254c --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/ClearAspectV2TableStep.java @@ -0,0 +1,35 @@ +package com.linkedin.datahub.upgrade.restorebackup; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import io.ebean.EbeanServer; +import java.util.function.Function; + + +/** + * Optional step for removing Aspect V2 table. + */ +public class ClearAspectV2TableStep implements UpgradeStep { + + private final EbeanServer _server; + + public ClearAspectV2TableStep(final EbeanServer server) { + _server = server; + } + + @Override + public String id() { + return "ClearAspectV2TableStep"; + } + + @Override + public Function executable() { + return (context) -> { + _server.find(EbeanAspectV2.class).delete(); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreBackup.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreBackup.java new file mode 100644 index 00000000000000..a1837125c17dd1 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreBackup.java @@ -0,0 +1,59 @@ +package com.linkedin.datahub.upgrade.restorebackup; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.Upgrade; +import com.linkedin.datahub.upgrade.UpgradeCleanupStep; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep; +import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep; +import com.linkedin.datahub.upgrade.common.steps.GMSDisableWriteModeStep; +import com.linkedin.datahub.upgrade.common.steps.GMSEnableWriteModeStep; +import com.linkedin.datahub.upgrade.common.steps.GMSQualificationStep; +import com.linkedin.entity.client.EntityClient; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.SearchService; +import io.ebean.EbeanServer; +import java.util.ArrayList; +import java.util.List; + + +public class RestoreBackup implements Upgrade { + + private final List _steps; + + public RestoreBackup(final EbeanServer server, final EntityService entityService, final EntityRegistry entityRegistry, + final EntityClient entityClient, final GraphService graphClient, final SearchService searchClient) { + _steps = buildSteps(server, entityService, entityRegistry, entityClient, graphClient, searchClient); + } + + @Override + public String id() { + return "RestoreBackup"; + } + + @Override + public List steps() { + return _steps; + } + + private List buildSteps(final EbeanServer server, final EntityService entityService, + final EntityRegistry entityRegistry, final EntityClient entityClient, final GraphService graphClient, + final SearchService searchClient) { + final List steps = new ArrayList<>(); + steps.add(new GMSQualificationStep()); + steps.add(new GMSDisableWriteModeStep(entityClient)); + steps.add(new ClearSearchServiceStep(searchClient, true)); + steps.add(new ClearGraphServiceStep(graphClient, true)); + steps.add(new ClearAspectV2TableStep(server)); + steps.add(new RestoreStorageStep(entityService, entityRegistry)); + steps.add(new GMSEnableWriteModeStep(entityClient)); + return steps; + } + + @Override + public List cleanupSteps() { + return ImmutableList.of(); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreStorageStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreStorageStep.java new file mode 100644 index 00000000000000..10eacbfb42571c --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/RestoreStorageStep.java @@ -0,0 +1,135 @@ +package com.linkedin.datahub.upgrade.restorebackup; + +import com.google.common.collect.ImmutableList; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.datahub.upgrade.restorebackup.backupreader.BackupReader; +import com.linkedin.datahub.upgrade.restorebackup.backupreader.EbeanAspectBackupIterator; +import com.linkedin.datahub.upgrade.restorebackup.backupreader.LocalParquetReader; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import com.linkedin.metadata.entity.ebean.EbeanUtils; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + + +public class RestoreStorageStep implements UpgradeStep { + + private static final int REPORT_BATCH_SIZE = 1000; + + private final EntityService _entityService; + private final EntityRegistry _entityRegistry; + private final Map _backupReaders; + + public RestoreStorageStep(final EntityService entityService, final EntityRegistry entityRegistry) { + _entityService = entityService; + _entityRegistry = entityRegistry; + _backupReaders = ImmutableList.of(new LocalParquetReader()) + .stream() + .collect(Collectors.toMap(BackupReader::getName, Function.identity())); + } + + @Override + public String id() { + return "RestoreStorageStep"; + } + + @Override + public int retryCount() { + return 0; + } + + @Override + public Function executable() { + return (context) -> { + + context.report().addLine("Starting backup restore..."); + int numRows = 0; + Optional backupReaderName = context.parsedArgs().get("BACKUP_READER"); + if (!backupReaderName.isPresent() || !_backupReaders.containsKey(backupReaderName.get())) { + context.report().addLine("BACKUP_READER is not set or is not valid"); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + + EbeanAspectBackupIterator iterator = _backupReaders.get(backupReaderName.get()).getBackupIterator(context); + EbeanAspectV2 aspect; + while ((aspect = iterator.next()) != null) { + numRows++; + + // 1. Extract an Entity type from the entity Urn + Urn urn; + try { + urn = Urn.createFromString(aspect.getKey().getUrn()); + } catch (Exception e) { + context.report() + .addLine( + String.format("Failed to bind Urn with value %s into Urn object: %s", aspect.getKey().getUrn(), e)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + + // 2. Verify that the entity associated with the aspect is found in the registry. + final String entityName = urn.getEntityType(); + final EntitySpec entitySpec; + try { + entitySpec = _entityRegistry.getEntitySpec(entityName); + } catch (Exception e) { + context.report() + .addLine(String.format("Failed to find Entity with name %s in Entity Registry: %s", entityName, e)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + final String aspectName = aspect.getKey().getAspect(); + + // 3. Create record from json aspect + final RecordTemplate aspectRecord = + EbeanUtils.toAspectRecord(entityName, aspectName, aspect.getMetadata(), _entityRegistry); + + // 4. Verify that the aspect is a valid aspect associated with the entity + try { + entitySpec.getAspectSpec(aspectName); + } catch (Exception e) { + context.report() + .addLine(String.format("Failed to find aspect spec with name %s associated with entity named %s: %s", + aspectName, entityName, e)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + + // 5. Write the row back using the EntityService + boolean emitMae = aspect.getKey().getVersion() == 0L; + _entityService.updateAspect(urn, aspectName, aspectRecord, toAuditStamp(aspect), aspect.getKey().getVersion(), + emitMae); + + if (numRows % REPORT_BATCH_SIZE == 0) { + context.report().addLine(String.format("Successfully inserted %d rows", numRows)); + } + } + + context.report().addLine(String.format("Added %d rows to the aspect v2 table", numRows)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } + + private AuditStamp toAuditStamp(final EbeanAspectV2 aspect) { + final AuditStamp auditStamp = new AuditStamp(); + auditStamp.setTime(aspect.getCreatedOn().getTime()); + + try { + auditStamp.setActor(new Urn(aspect.getCreatedBy())); + if (aspect.getCreatedFor() != null) { + auditStamp.setImpersonator(new Urn(aspect.getCreatedFor())); + } + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + return auditStamp; + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/BackupReader.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/BackupReader.java new file mode 100644 index 00000000000000..7bb6faa3d4bf94 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/BackupReader.java @@ -0,0 +1,16 @@ +package com.linkedin.datahub.upgrade.restorebackup.backupreader; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import javax.annotation.Nonnull; + + +/** + * Base interface for BackupReader used for creating the BackupIterator to retrieve EbeanAspectV2 object to be + * ingested back into GMS + */ +public interface BackupReader { + String getName(); + + @Nonnull + EbeanAspectBackupIterator getBackupIterator(UpgradeContext context); +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/EbeanAspectBackupIterator.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/EbeanAspectBackupIterator.java new file mode 100644 index 00000000000000..e08578a19f7a1c --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/EbeanAspectBackupIterator.java @@ -0,0 +1,14 @@ +package com.linkedin.datahub.upgrade.restorebackup.backupreader; + +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import java.io.Closeable; + + +/** + * Base interface for iterators that retrieves EbeanAspectV2 objects + * This allows us to restore from backups of various format + */ +public interface EbeanAspectBackupIterator extends Closeable { + // Get the next row in backup. Return null if finished. + EbeanAspectV2 next(); +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/LocalParquetReader.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/LocalParquetReader.java new file mode 100644 index 00000000000000..54856e4b9d264d --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/LocalParquetReader.java @@ -0,0 +1,49 @@ +package com.linkedin.datahub.upgrade.restorebackup.backupreader; + +import com.linkedin.datahub.upgrade.UpgradeContext; +import java.io.IOException; +import java.util.Optional; +import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; + + +/** + * BackupReader for retrieving EbeanAspectV2 objects from a local parquet file + */ +@Slf4j +public class LocalParquetReader implements BackupReader { + + public LocalParquetReader() { + // Need below to solve issue with hadoop path class not working in linux systems + // https://stackoverflow.com/questions/41864985/hadoop-ioexception-failure-to-login + UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("hduser")); + } + + @Override + public String getName() { + return "LOCAL_PARQUET"; + } + + @Nonnull + @Override + public EbeanAspectBackupIterator getBackupIterator(UpgradeContext context) { + Optional path = context.parsedArgs().get("BACKUP_FILE_PATH"); + if (!path.isPresent()) { + context.report().addLine("BACKUP_FILE_PATH must be set to run RestoreBackup through local parquet file"); + throw new IllegalArgumentException( + "BACKUP_FILE_PATH must be set to run RestoreBackup through local parquet file"); + } + + try { + ParquetReader reader = AvroParquetReader.builder(new Path(path.get())).build(); + return new ParquetEbeanAspectBackupIterator(reader); + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to build ParquetReader: %s", e)); + } + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/ParquetEbeanAspectBackupIterator.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/ParquetEbeanAspectBackupIterator.java new file mode 100644 index 00000000000000..afde3451ded327 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restorebackup/backupreader/ParquetEbeanAspectBackupIterator.java @@ -0,0 +1,48 @@ +package com.linkedin.datahub.upgrade.restorebackup.backupreader; + +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.hadoop.ParquetReader; + + +/** + * Iterator to retrieve EbeanAspectV2 objects from the ParquetReader + * Converts the avro GenericRecord object into EbeanAspectV2 + */ +@RequiredArgsConstructor +public class ParquetEbeanAspectBackupIterator implements EbeanAspectBackupIterator { + private final ParquetReader _parquetReader; + + @Override + public EbeanAspectV2 next() { + try { + GenericRecord record = _parquetReader.read(); + if (record == null) { + return null; + } + return convertRecord(record); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void close() throws IOException { + _parquetReader.close(); + } + + private EbeanAspectV2 convertRecord(GenericRecord record) { + EbeanAspectV2.PrimaryKey key = + new EbeanAspectV2.PrimaryKey(record.get("urn").toString(), record.get("aspect").toString(), + (Long) record.get("version")); + return new EbeanAspectV2(key, record.get("metadata").toString(), + Timestamp.from(Instant.ofEpochMilli((Long) record.get("createdon") / 1000)), record.get("createdby").toString(), + Optional.ofNullable(record.get("createdfor")).map(Object::toString).orElse(null)); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java new file mode 100644 index 00000000000000..a811d94c9566b0 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java @@ -0,0 +1,54 @@ +package com.linkedin.datahub.upgrade.restoreindices; + +import com.google.common.collect.ImmutableList; +import com.linkedin.datahub.upgrade.Upgrade; +import com.linkedin.datahub.upgrade.UpgradeCleanupStep; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.common.steps.ClearGraphServiceStep; +import com.linkedin.datahub.upgrade.common.steps.ClearSearchServiceStep; +import com.linkedin.datahub.upgrade.common.steps.GMSQualificationStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.graph.GraphService; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.SearchService; +import io.ebean.EbeanServer; +import java.util.ArrayList; +import java.util.List; + + +public class RestoreIndices implements Upgrade { + public static final String BATCH_SIZE_ARG_NAME = "batchSize"; + public static final String BATCH_DELAY_MS_ARG_NAME = "batchDelayMs"; + + private final List _steps; + + public RestoreIndices(final EbeanServer server, final EntityService entityService, + final EntityRegistry entityRegistry, final SearchService searchService, final GraphService graphService) { + _steps = buildSteps(server, entityService, entityRegistry, searchService, graphService); + } + + @Override + public String id() { + return "RestoreIndices"; + } + + @Override + public List steps() { + return _steps; + } + + private List buildSteps(final EbeanServer server, final EntityService entityService, + final EntityRegistry entityRegistry, final SearchService searchService, final GraphService graphService) { + final List steps = new ArrayList<>(); + steps.add(new GMSQualificationStep()); + steps.add(new ClearSearchServiceStep(searchService, false)); + steps.add(new ClearGraphServiceStep(graphService, false)); + steps.add(new SendMAEStep(server, entityService, entityRegistry)); + return steps; + } + + @Override + public List cleanupSteps() { + return ImmutableList.of(); + } +} diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java new file mode 100644 index 00000000000000..a5446bebb6f2c9 --- /dev/null +++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java @@ -0,0 +1,158 @@ +package com.linkedin.datahub.upgrade.restoreindices; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.datahub.upgrade.UpgradeContext; +import com.linkedin.datahub.upgrade.UpgradeStep; +import com.linkedin.datahub.upgrade.UpgradeStepResult; +import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult; +import com.linkedin.datahub.upgrade.nocode.NoCodeUpgrade; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.entity.ebean.EbeanAspectV2; +import com.linkedin.metadata.entity.ebean.EbeanUtils; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import io.ebean.EbeanServer; +import io.ebean.PagedList; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + + +public class SendMAEStep implements UpgradeStep { + + private static final int DEFAULT_BATCH_SIZE = 1000; + private static final long DEFAULT_BATCH_DELAY_MS = 250; + + private final EbeanServer _server; + private final EntityService _entityService; + private final EntityRegistry _entityRegistry; + + public SendMAEStep(final EbeanServer server, final EntityService entityService, final EntityRegistry entityRegistry) { + _server = server; + _entityService = entityService; + _entityRegistry = entityRegistry; + } + + @Override + public String id() { + return "SendMAEStep"; + } + + @Override + public int retryCount() { + return 0; + } + + @Override + public Function executable() { + return (context) -> { + + context.report().addLine("Sending MAE from local DB..."); + final int rowCount = _server.find(EbeanAspectV2.class).where().eq(EbeanAspectV2.VERSION_COLUMN, 0).findCount(); + context.report().addLine(String.format("Found %s latest aspects in aspects table", rowCount)); + + int totalRowsMigrated = 0; + int start = 0; + int count = getBatchSize(context.parsedArgs()); + while (start < rowCount) { + + context.report() + .addLine(String.format("Reading rows %s through %s from the aspects table.", start, start + count)); + PagedList rows = getPagedAspects(start, count); + + for (EbeanAspectV2 aspect : rows.getList()) { + // 1. Extract an Entity type from the entity Urn + Urn urn; + try { + urn = Urn.createFromString(aspect.getKey().getUrn()); + } catch (Exception e) { + context.report() + .addLine( + String.format("Failed to bind Urn with value %s into Urn object: %s", aspect.getKey().getUrn(), e)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + + // 2. Verify that the entity associated with the aspect is found in the registry. + final String entityName = urn.getEntityType(); + final EntitySpec entitySpec; + try { + entitySpec = _entityRegistry.getEntitySpec(entityName); + } catch (Exception e) { + context.report() + .addLine(String.format("Failed to find Entity with name %s in Entity Registry: %s", entityName, e)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + final String aspectName = aspect.getKey().getAspect(); + + // 3. Create record from json aspect + final RecordTemplate aspectRecord = + EbeanUtils.toAspectRecord(entityName, aspectName, aspect.getMetadata(), _entityRegistry); + + // 4. Verify that the aspect is a valid aspect associated with the entity + try { + entitySpec.getAspectSpec(aspectName); + } catch (Exception e) { + context.report() + .addLine(String.format("Failed to find aspect spec with name %s associated with entity named %s: %s", + aspectName, entityName, e)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + + // 5. Produce MAE events for the aspect record + _entityService.produceMetadataAuditEvent(urn, null, aspectRecord); + + totalRowsMigrated++; + } + context.report().addLine(String.format("Successfully sent MAEs for %s rows", totalRowsMigrated)); + start = start + count; + try { + TimeUnit.MILLISECONDS.sleep(getBatchDelayMs(context.parsedArgs())); + } catch (InterruptedException e) { + throw new RuntimeException("Thread interrupted while sleeping after successful batch migration."); + } + } + if (totalRowsMigrated != rowCount) { + context.report() + .addLine( + String.format("Number of MAEs sent %s does not equal the number of input rows %s...", totalRowsMigrated, + rowCount)); + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.FAILED); + } + return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED); + }; + } + + private PagedList getPagedAspects(final int start, final int pageSize) { + return _server.find(EbeanAspectV2.class) + .select(EbeanAspectV2.ALL_COLUMNS) + .where() + .eq(EbeanAspectV2.VERSION_COLUMN, 0) + .orderBy() + .asc(EbeanAspectV2.URN_COLUMN) + .orderBy() + .asc(EbeanAspectV2.ASPECT_COLUMN) + .setFirstRow(start) + .setMaxRows(pageSize) + .findPagedList(); + } + + private int getBatchSize(final Map> parsedArgs) { + int resolvedBatchSize = DEFAULT_BATCH_SIZE; + if (parsedArgs.containsKey(RestoreIndices.BATCH_SIZE_ARG_NAME) && parsedArgs.get(NoCodeUpgrade.BATCH_SIZE_ARG_NAME) + .isPresent()) { + resolvedBatchSize = Integer.parseInt(parsedArgs.get(RestoreIndices.BATCH_SIZE_ARG_NAME).get()); + } + return resolvedBatchSize; + } + + private long getBatchDelayMs(final Map> parsedArgs) { + long resolvedBatchDelayMs = DEFAULT_BATCH_DELAY_MS; + if (parsedArgs.containsKey(RestoreIndices.BATCH_DELAY_MS_ARG_NAME) && parsedArgs.get( + NoCodeUpgrade.BATCH_DELAY_MS_ARG_NAME).isPresent()) { + resolvedBatchDelayMs = Long.parseLong(parsedArgs.get(RestoreIndices.BATCH_DELAY_MS_ARG_NAME).get()); + } + return resolvedBatchDelayMs; + } +} diff --git a/docker/datahub-upgrade/README.md b/docker/datahub-upgrade/README.md index 3a32e0ee818ecc..c5a4371fef0509 100644 --- a/docker/datahub-upgrade/README.md +++ b/docker/datahub-upgrade/README.md @@ -15,6 +15,10 @@ to metadata_aspect_v2 table. Arguments: 2. **NoCodeDataMigrationCleanup**: Cleanses graph index, search index, and key-value store of legacy DataHub data (metadata_aspect table) once the No Code Data Migration has completed successfully. No arguments. +3. **RestoreIndices**: Restores indices by fetching the latest version of each aspect and producing MAE + +4. **RestoreBackup**: Restores the storage stack from a backup of the local database + ## Environment Variables To run the `datahub-upgrade` container, some environment variables must be provided in order to tell the upgrade CLI diff --git a/docker/datahub-upgrade/env/docker-without-neo4j.env b/docker/datahub-upgrade/env/docker-without-neo4j.env new file mode 100644 index 00000000000000..cab79c600ad191 --- /dev/null +++ b/docker/datahub-upgrade/env/docker-without-neo4j.env @@ -0,0 +1,28 @@ +EBEAN_DATASOURCE_USERNAME=datahub +EBEAN_DATASOURCE_PASSWORD=datahub +EBEAN_DATASOURCE_HOST=mysql:3306 +EBEAN_DATASOURCE_URL=jdbc:mysql://mysql:3306/datahub?verifyServerCertificate=false&useSSL=true&useUnicode=yes&characterEncoding=UTF-8 +EBEAN_DATASOURCE_DRIVER=com.mysql.jdbc.Driver + +KAFKA_BOOTSTRAP_SERVER=broker:29092 +KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 + +ELASTICSEARCH_HOST=elasticsearch +ELASTICSEARCH_PORT=9200 + +GRAPH_SERVICE_IMPL=elasticsearch + +DATAHUB_GMS_HOST=datahub-gms +DATAHUB_GMS_PORT=8080 + + +# Uncomment and set these to support SSL connection to Elasticsearch +# ELASTICSEARCH_USE_SSL= +# ELASTICSEARCH_SSL_PROTOCOL= +# ELASTICSEARCH_SSL_SECURE_RANDOM_IMPL= +# ELASTICSEARCH_SSL_TRUSTSTORE_FILE= +# ELASTICSEARCH_SSL_TRUSTSTORE_TYPE= +# ELASTICSEARCH_SSL_TRUSTSTORE_PASSWORD= +# ELASTICSEARCH_SSL_KEYSTORE_FILE= +# ELASTICSEARCH_SSL_KEYSTORE_TYPE= +# ELASTICSEARCH_SSL_KEYSTORE_PASSWORD= \ No newline at end of file diff --git a/docker/datahub-upgrade/env/docker.env b/docker/datahub-upgrade/env/docker.env index 23f3bc59b8aca6..52481c0a8e03b4 100644 --- a/docker/datahub-upgrade/env/docker.env +++ b/docker/datahub-upgrade/env/docker.env @@ -10,6 +10,7 @@ KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081 ELASTICSEARCH_HOST=elasticsearch ELASTICSEARCH_PORT=9200 +GRAPH_SERVICE_IMPL=neo4j NEO4J_HOST=http://neo4j:7474 NEO4J_URI=bolt://neo4j NEO4J_USERNAME=neo4j @@ -18,8 +19,6 @@ NEO4J_PASSWORD=datahub DATAHUB_GMS_HOST=datahub-gms DATAHUB_GMS_PORT=8080 -DATAHUB_MAE_CONSUMER_HOST=datahub-mae-consumer -DATAHUB_MAE_CONSUMER_PORT=9091 # Uncomment and set these to support SSL connection to Elasticsearch # ELASTICSEARCH_USE_SSL= diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js index 8d05e002242e1d..113899bc6be15e 100644 --- a/docs-website/sidebars.js +++ b/docs-website/sidebars.js @@ -87,6 +87,7 @@ module.exports = { "docs/how/configure-oidc-react", "docs/how/sso/configure-oidc-react-google", "docs/how/sso/configure-oidc-react-okta", + "docs/how/restore-indices", "datahub-web-react/src/app/analytics/README", ], Components: [ diff --git a/docs/how/restore-indices.md b/docs/how/restore-indices.md new file mode 100644 index 00000000000000..43061f730261cd --- /dev/null +++ b/docs/how/restore-indices.md @@ -0,0 +1,42 @@ +# Restoring Search and Graph Indices from Local Database + +If search or graph services go down or you have made changes to them that require reindexing, you can restore them from +the aspects stored in the local database. + +When a new version of the aspect gets ingested, GMS initiates an MAE event for the aspect which is consumed to update +the search and graph indices. As such, we can fetch the latest version of each aspect in the local database and produce +MAE events corresponding to the aspects to restore the search and graph indices. + +## Docker-compose + +Run the following command from root to send MAE for each aspect in the Local DB. + +``` +./docker/datahub-upgrade/datahub-upgrade.sh -u RestoreIndices +``` + +If you need to clear the search and graph indices before restoring, add `-a clean` to the end of the command. + +Refer to this [doc](../../docker/datahub-upgrade/README.md#environment-variables) on how to set environment variables +for your environment. + +## Kubernetes + +Run `kubectl get cronjobs` to see if the restoration job template has been deployed. If you see results like below, you +are good to go. + +``` +NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE +datahub-datahub-cleanup-job-template * * * * * True 0 2d3h +datahub-datahub-restore-indices-job-template * * * * * True 0 2d3h +``` + +If not, deploy latest helm charts to use this functionality. + +Once restore indices job template has been deployed, run the following command to start a job that restores indices. + +``` +kubectl create job --from=cronjob/datahub-datahub-restore-indices-job-template restore-indices-adhoc +``` + +Once the job completes, your indices will have been restored. \ No newline at end of file diff --git a/gms/api/src/main/idl/com.linkedin.entity.entities.restspec.json b/gms/api/src/main/idl/com.linkedin.entity.entities.restspec.json index ba6a4c4e325273..3779adfaf118c7 100644 --- a/gms/api/src/main/idl/com.linkedin.entity.entities.restspec.json +++ b/gms/api/src/main/idl/com.linkedin.entity.entities.restspec.json @@ -111,7 +111,12 @@ } ], "returns" : "com.linkedin.metadata.query.SearchResult" }, { - "name" : "setWritable" + "name" : "setWritable", + "parameters" : [ { + "name" : "value", + "type" : "boolean", + "default" : "true" + } ] } ], "entity" : { "path" : "/entities/{entitiesId}" diff --git a/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 6ce21bf3756032..56d0ea4a84ea32 100644 --- a/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -4032,7 +4032,12 @@ } ], "returns" : "com.linkedin.metadata.query.SearchResult" }, { - "name" : "setWritable" + "name" : "setWritable", + "parameters" : [ { + "name" : "value", + "type" : "boolean", + "default" : "true" + } ] } ], "entity" : { "path" : "/entities/{entitiesId}" diff --git a/gms/client/src/main/java/com/linkedin/entity/client/EntityClient.java b/gms/client/src/main/java/com/linkedin/entity/client/EntityClient.java index 4be6d824ea68a0..519f74f819d04a 100644 --- a/gms/client/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/gms/client/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -1,6 +1,7 @@ package com.linkedin.entity.client; import com.linkedin.common.urn.Urn; +import com.linkedin.entity.EntitiesDoSetWritableRequestBuilder; import com.linkedin.restli.client.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -259,4 +260,10 @@ public StringArray getBrowsePaths(@Nonnull Urn urn) throws RemoteInvocationExcep .urnParam(urn); return sendClientRequest(requestBuilder.build()).getEntity(); } + + public void setWritable(boolean canWrite) throws RemoteInvocationException { + EntitiesDoSetWritableRequestBuilder requestBuilder = + ENTITIES_REQUEST_BUILDERS.actionSetWritable().valueParam(canWrite); + sendClientRequest(requestBuilder.build()); + } } diff --git a/gms/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java b/gms/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java index cf083265ea28b6..5c37bc8a8913a5 100644 --- a/gms/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java +++ b/gms/factories/src/main/java/com/linkedin/gms/factory/search/SearchServiceFactory.java @@ -8,6 +8,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; @Configuration @@ -18,6 +19,7 @@ public class SearchServiceFactory { private ElasticSearchService elasticSearchService; @Bean(name = "searchService") + @Primary @Nonnull protected SearchService getInstance() { return elasticSearchService; diff --git a/gms/impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java b/gms/impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java index aba57dd5c430d3..831eb317609679 100644 --- a/gms/impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java +++ b/gms/impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java @@ -66,6 +66,7 @@ public class EntityResource extends CollectionResourceTaskTemplate getBrowsePaths( */ @Action(name = "setWritable") @Nonnull - public Task setWriteable() { + public Task setWriteable(@ActionParam(PARAM_VALUE) @Optional("true") @Nonnull Boolean value) { log.info("setting entity resource to be writable"); return RestliUtils.toTask(() -> { - _entityService.setWritable(); + _entityService.setWritable(value); return null; }); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java index d042804dd60629..3a21f26bc0fa61 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityService.java @@ -197,6 +197,30 @@ public Map getEntities(@Nonnull final Set urns, @Nonnull Set toEntity(entry.getValue()))); } + /** + * Produce metadata audit event and push. + * + * @param urn Urn to push + * @param oldAspectValue Value of aspect before the update. + * @param newAspectValue Value of aspect after the update + */ + public void produceMetadataAuditEvent(@Nonnull final Urn urn, @Nullable final RecordTemplate oldAspectValue, + @Nonnull final RecordTemplate newAspectValue) { + + final Snapshot newSnapshot = buildSnapshot(urn, newAspectValue); + Snapshot oldSnapshot = null; + if (oldAspectValue != null) { + oldSnapshot = buildSnapshot(urn, oldAspectValue); + } + + _producer.produceMetadataAuditEvent(urn, oldSnapshot, newSnapshot); + + // 4.1 Produce aspect specific MAE after a successful update + if (_emitAspectSpecificAuditEvent) { + _producer.produceAspectSpecificMetadataAuditEvent(urn, oldAspectValue, newAspectValue); + } + } + public RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName) { log.debug(String.format("Invoked getLatestAspect with urn %s, aspect %s", urn, aspectName)); return getAspect(urn, aspectName, LATEST_ASPECT_VERSION); @@ -248,25 +272,6 @@ private void ingestSnapshotUnion(@Nonnull final Snapshot snapshotUnion, @Nonnull }); } - protected void produceMetadataAuditEvent( - @Nonnull final Urn urn, - @Nullable final RecordTemplate oldValue, - @Nonnull final RecordTemplate newValue) { - - final Snapshot newSnapshot = buildSnapshot(urn, newValue); - Snapshot oldSnapshot = null; - if (oldValue != null) { - oldSnapshot = buildSnapshot(urn, oldValue); - } - - _producer.produceMetadataAuditEvent(urn, oldSnapshot, newSnapshot); - - // 4.1 Produce aspect specific MAE after a successful update - if (_emitAspectSpecificAuditEvent) { - _producer.produceAspectSpecificMetadataAuditEvent(urn, oldValue, newValue); - } - } - private Snapshot buildSnapshot(@Nonnull final Urn urn, @Nonnull final RecordTemplate aspectValue) { final RecordTemplate keyAspectValue = buildKeyAspect(urn); return toSnapshotUnion( @@ -357,5 +362,5 @@ protected Set getEntityAspectNames(final String entityName) { return _entityToValidAspects.get(entityName); } - public abstract void setWritable(); + public abstract void setWritable(boolean canWrite); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java index 0ef06beec703a0..d183f1830a11c7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java @@ -70,8 +70,8 @@ public EbeanAspectDao(@Nonnull final EbeanServer server) { _server = server; } - public void setWritable() { - _canWrite = true; + public void setWritable(boolean canWrite) { + _canWrite = canWrite; } /** diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java index 800ca312c3d3c3..756e841bf96805 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectV2.java @@ -22,6 +22,8 @@ */ @Getter @Setter +@NoArgsConstructor +@AllArgsConstructor @Entity @Table(name = "metadata_aspect_v2") public class EbeanAspectV2 extends Model { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java index 4a1b7995d2fba2..6dce148e9c32a6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanEntityService.java @@ -2,7 +2,6 @@ import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; -import com.linkedin.data.schema.RecordDataSchema; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.aspect.Aspect; @@ -11,8 +10,6 @@ import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.ListResult; import com.linkedin.metadata.event.EntityEventProducer; -import com.linkedin.metadata.models.AspectSpec; -import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; import java.sql.Timestamp; import java.util.ArrayList; @@ -28,7 +25,8 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; -import static com.linkedin.metadata.PegasusUtils.*; +import static com.linkedin.metadata.entity.ebean.EbeanUtils.toAspectRecord; +import static com.linkedin.metadata.entity.ebean.EbeanUtils.toJsonAspect; /** @@ -88,7 +86,8 @@ public Map> getLatestAspects(@Nonnull final Set u _entityDao.batchGet(dbKeys).forEach((key, aspectEntry) -> { final Urn urn = toUrn(key.getUrn()); final String aspectName = key.getAspect(); - final RecordTemplate aspectRecord = toAspectRecord(urn, aspectName, aspectEntry.getMetadata()); + final RecordTemplate aspectRecord = + toAspectRecord(urn, aspectName, aspectEntry.getMetadata(), getEntityRegistry()); urnToAspects.putIfAbsent(urn, new ArrayList<>()); urnToAspects.get(urn).add(aspectRecord); }); @@ -118,7 +117,7 @@ public RecordTemplate getAspect(@Nonnull final Urn urn, @Nonnull final String as final EbeanAspectV2.PrimaryKey primaryKey = new EbeanAspectV2.PrimaryKey(urn.toString(), aspectName, version); final Optional maybeAspect = Optional.ofNullable(_entityDao.getAspect(primaryKey)); return maybeAspect - .map(ebeanAspect -> toAspectRecord(urn, aspectName, ebeanAspect.getMetadata())) + .map(ebeanAspect -> toAspectRecord(urn, aspectName, ebeanAspect.getMetadata(), getEntityRegistry())) .orElse(null); } @@ -134,7 +133,7 @@ public VersionedAspect getVersionedAspect(@Nonnull Urn urn, @Nonnull String aspe final EbeanAspectV2.PrimaryKey primaryKey = new EbeanAspectV2.PrimaryKey(urn.toString(), aspectName, version); final Optional maybeAspect = Optional.ofNullable(_entityDao.getAspect(primaryKey)); RecordTemplate aspect = maybeAspect - .map(ebeanAspect -> toAspectRecord(urn, aspectName, ebeanAspect.getMetadata())) + .map(ebeanAspect -> toAspectRecord(urn, aspectName, ebeanAspect.getMetadata(), getEntityRegistry())) .orElse(null); if (aspect == null) { @@ -169,10 +168,8 @@ public ListResult listLatestAspects( final List aspects = new ArrayList<>(); for (int i = 0; i < aspectMetadataList.getValues().size(); i++) { - aspects.add(toAspectRecord( - aspectMetadataList.getMetadata().getExtraInfos().get(i).getUrn(), - aspectName, - aspectMetadataList.getValues().get(i))); + aspects.add(toAspectRecord(aspectMetadataList.getMetadata().getExtraInfos().get(i).getUrn(), aspectName, + aspectMetadataList.getValues().get(i), getEntityRegistry())); } return new ListResult<>( @@ -211,7 +208,8 @@ private RecordTemplate ingestAspect( final EbeanAspectV2 latest = _entityDao.getLatestAspect(urn.toString(), aspectName); // 2. Compare the latest existing and new. - final RecordTemplate oldValue = latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata()); + final RecordTemplate oldValue = + latest == null ? null : toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry()); final RecordTemplate newValue = updateLambda.apply(Optional.ofNullable(oldValue)); // 3. Skip updating if there is no difference between existing and new. @@ -286,8 +284,8 @@ private RecordTemplate updateAspect( final AddAspectResult result = _entityDao.runInTransactionWithRetry(() -> { final EbeanAspectV2 oldAspect = _entityDao.getAspect(urn.toString(), aspectName, version); - final RecordTemplate oldValue = oldAspect == null ? null : toAspectRecord(urn, aspectName, - oldAspect.getMetadata()); + final RecordTemplate oldValue = oldAspect == null ? null + : toAspectRecord(urn, aspectName, oldAspect.getMetadata(), getEntityRegistry()); log.debug(String.format("Updating aspect with name %s, urn %s", aspectName, urn)); _entityDao.saveAspect( @@ -333,32 +331,8 @@ private static class AddAspectResult { RecordTemplate newValue; } - @Nonnull - protected static String toJsonAspect(@Nonnull final RecordTemplate aspectRecord) { - return RecordUtils.toJsonString(aspectRecord); - } - - @Nonnull - protected RecordTemplate toAspectRecord( - @Nonnull final Urn entityUrn, - @Nonnull final String aspectName, - @Nonnull final String jsonAspect) { - return toAspectRecord(urnToEntityName(entityUrn), aspectName, jsonAspect); - } - - protected RecordTemplate toAspectRecord( - @Nonnull final String entityName, - @Nonnull final String aspectName, - @Nonnull final String jsonAspect) { - final EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName); - final AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); - final RecordDataSchema aspectSchema = aspectSpec.getPegasusSchema(); - return RecordUtils.toRecordTemplate(getDataTemplateClassFromSchema(aspectSchema, RecordTemplate.class), jsonAspect); - } - - @Nonnull - public void setWritable() { + public void setWritable(boolean canWrite) { log.debug("Enabling writes"); - _entityDao.setWritable(); + _entityDao.setWritable(canWrite); } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanUtils.java new file mode 100644 index 00000000000000..abcb7fed703adc --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanUtils.java @@ -0,0 +1,37 @@ +package com.linkedin.metadata.entity.ebean; + +import com.linkedin.common.urn.Urn; +import com.linkedin.data.schema.RecordDataSchema; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.metadata.PegasusUtils; +import com.linkedin.metadata.dao.utils.RecordUtils; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import javax.annotation.Nonnull; + +import static com.linkedin.metadata.PegasusUtils.getDataTemplateClassFromSchema; + + +public class EbeanUtils { + private EbeanUtils() { + } + + @Nonnull + public static String toJsonAspect(@Nonnull final RecordTemplate aspectRecord) { + return RecordUtils.toJsonString(aspectRecord); + } + + @Nonnull + public static RecordTemplate toAspectRecord(@Nonnull final Urn entityUrn, @Nonnull final String aspectName, + @Nonnull final String jsonAspect, @Nonnull final EntityRegistry entityRegistry) { + return toAspectRecord(PegasusUtils.urnToEntityName(entityUrn), aspectName, jsonAspect, entityRegistry); + } + public static RecordTemplate toAspectRecord(@Nonnull final String entityName, @Nonnull final String aspectName, + @Nonnull final String jsonAspect, @Nonnull final EntityRegistry entityRegistry) { + final EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName); + final AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName); + final RecordDataSchema aspectSchema = aspectSpec.getPegasusSchema(); + return RecordUtils.toRecordTemplate(getDataTemplateClassFromSchema(aspectSchema, RecordTemplate.class), jsonAspect); + } +} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java index df284fff3b6dc1..3ed4a15687437a 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/GraphService.java @@ -30,4 +30,6 @@ void removeEdgesFromNode( @Nonnull final RelationshipFilter relationshipFilter); void configure(); + + void clear(); } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java index 6c362cdfbbf5e2..f6820a1f3c2968 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/Neo4jGraphService.java @@ -189,6 +189,11 @@ public void configure() { // Do nothing } + @Override + public void clear() { + removeNodesMatchingLabel(".*"); + } + // visible for testing @Nonnull Statement buildStatement(@Nonnull String queryTemplate, @Nonnull Map params) { diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 5cd7272df91655..8deaec093e6049 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -35,6 +35,9 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; + @Slf4j @RequiredArgsConstructor @@ -218,4 +221,15 @@ public void configure() { return; } + + @Override + public void clear() { + DeleteByQueryRequest deleteRequest = + new DeleteByQueryRequest(_indexConvention.getIndexName(INDEX_NAME)).setQuery(QueryBuilders.matchAllQuery()); + try { + searchClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); + } catch (Exception e) { + log.error("Failed to clear graph service: {}", e.toString()); + } + } } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java index da6399902547f1..66dc64e8427f35 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/SearchService.java @@ -15,6 +15,11 @@ public interface SearchService { void configure(); + /** + * Clear all data within the service + */ + void clear(); + /** * Updates or inserts the given search document. * diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java index 2df6c9a05bec92..18a7c27d789cb0 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java @@ -32,6 +32,11 @@ public void configure() { indexBuilders.buildAll(); } + @Override + public void clear() { + esWriteDAO.clear(); + } + @Override public void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId) { log.debug(String.format("Upserting Search document entityName: %s, document: %s, docId: %s", entityName, diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java index ffc8ac948d4d0f..0e2a94488cade9 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/update/ESWriteDAO.java @@ -2,7 +2,9 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.metadata.utils.elasticsearch.IndexConvention; +import java.io.IOException; import javax.annotation.Nonnull; +import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.delete.DeleteRequest; @@ -10,13 +12,19 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.GetIndexRequest; +import org.elasticsearch.client.indices.GetIndexResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +@Slf4j public class ESWriteDAO { private final EntityRegistry entityRegistry; + private final RestHighLevelClient searchClient; private final BulkProcessor bulkProcessor; private final IndexConvention indexConvention; @@ -24,6 +32,7 @@ public ESWriteDAO(EntityRegistry entityRegistry, RestHighLevelClient searchClien int bulkRequestsLimit, int bulkFlushPeriod, int numRetries, long retryInterval) { this.entityRegistry = entityRegistry; this.indexConvention = indexConvention; + this.searchClient = searchClient; this.bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> searchClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), BulkListener.getInstance()) @@ -43,9 +52,8 @@ public ESWriteDAO(EntityRegistry entityRegistry, RestHighLevelClient searchClien public void upsertDocument(@Nonnull String entityName, @Nonnull String document, @Nonnull String docId) { final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName)); final IndexRequest indexRequest = new IndexRequest(indexName).id(docId).source(document, XContentType.JSON); - final UpdateRequest updateRequest = new UpdateRequest(indexName, docId).doc(document, XContentType.JSON) - .detectNoop(false) - .upsert(indexRequest); + final UpdateRequest updateRequest = + new UpdateRequest(indexName, docId).doc(document, XContentType.JSON).detectNoop(false).upsert(indexRequest); bulkProcessor.add(updateRequest); } @@ -59,4 +67,27 @@ public void deleteDocument(@Nonnull String entityName, @Nonnull String docId) { final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName)); bulkProcessor.add(new DeleteRequest(indexName).id(docId)); } + + /** + * Clear all documents in all the indices + */ + public void clear() { + String[] indices = getIndices(indexConvention.getAllIndicesPattern()); + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indices).setQuery(QueryBuilders.matchAllQuery()); + try { + searchClient.deleteByQuery(deleteRequest, RequestOptions.DEFAULT); + } catch (Exception e) { + log.error("Failed to delete content of search indices: {}", e.toString()); + } + } + + private String[] getIndices(String pattern) { + try { + GetIndexResponse response = searchClient.indices().get(new GetIndexRequest(pattern), RequestOptions.DEFAULT); + return response.getIndices(); + } catch (IOException e) { + log.error("Failed to get indices using pattern {}", pattern); + return new String[]{}; + } + } } diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java index bd1671886d6277..2da0eda99c0c08 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConvention.java @@ -2,6 +2,7 @@ import com.linkedin.data.template.RecordTemplate; import com.linkedin.metadata.models.EntitySpec; +import java.util.Optional; import javax.annotation.Nonnull; @@ -9,6 +10,8 @@ * The convention for naming search indices */ public interface IndexConvention { + Optional getPrefix(); + @Nonnull String getIndexName(Class documentClass); @@ -17,4 +20,7 @@ public interface IndexConvention { @Nonnull String getIndexName(String baseIndexName); + + @Nonnull + String getAllIndicesPattern(); } diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java index 7a315c8fd67937..412c69f53baf8d 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/elasticsearch/IndexConventionImpl.java @@ -14,18 +14,25 @@ public class IndexConventionImpl implements IndexConvention { private final Map indexNameMapping = new HashMap<>(); private final Optional _prefix; + private final String _getAllIndicesPattern; private final static String VERSION = "v2"; private final static String SUFFIX = "index"; public IndexConventionImpl(@Nullable String prefix) { _prefix = StringUtils.isEmpty(prefix) ? Optional.empty() : Optional.of(prefix); + _getAllIndicesPattern = _prefix.map(p -> p + "_").orElse("") + "*" + SUFFIX + "_" + VERSION; } private String createIndexName(String baseName) { return (_prefix.map(prefix -> prefix + "_").orElse("") + baseName).toLowerCase(); } + @Override + public Optional getPrefix() { + return _prefix; + } + @Nonnull @Override public String getIndexName(Class documentClass) { @@ -43,4 +50,10 @@ public String getIndexName(EntitySpec entitySpec) { public String getIndexName(String baseIndexName) { return indexNameMapping.computeIfAbsent(baseIndexName, this::createIndexName); } + + @Nonnull + @Override + public String getAllIndicesPattern() { + return _getAllIndicesPattern; + } }