11package com .linkedin .metadata .entity ;
22
3+ import com .codahale .metrics .Timer ;
34import com .google .common .collect .ImmutableList ;
45import com .google .common .collect .Streams ;
56import com .linkedin .common .AuditStamp ;
2627import com .linkedin .metadata .utils .EntityKeyUtils ;
2728import com .linkedin .metadata .utils .GenericAspectUtils ;
2829import com .linkedin .metadata .utils .PegasusUtils ;
30+ import com .linkedin .metadata .utils .metrics .MetricUtils ;
2931import com .linkedin .mxe .MetadataAuditOperation ;
3032import com .linkedin .mxe .MetadataChangeLog ;
3133import com .linkedin .mxe .MetadataChangeProposal ;
3840import java .util .Map ;
3941import java .util .Optional ;
4042import java .util .Set ;
43+ import java .util .function .Function ;
4144import java .util .stream .Collectors ;
4245import javax .annotation .Nonnull ;
4346import javax .annotation .Nullable ;
47+ import lombok .Getter ;
48+ import lombok .Setter ;
49+ import lombok .Value ;
4450import lombok .extern .slf4j .Slf4j ;
4551
4652import static com .linkedin .metadata .Constants .ASPECT_LATEST_VERSION ;
@@ -88,7 +94,10 @@ public abstract class EntityService {
8894 private final EntityEventProducer _producer ;
8995 private final EntityRegistry _entityRegistry ;
9096 private final Map <String , Set <String >> _entityToValidAspects ;
91- private Boolean _emitAspectSpecificAuditEvent = false ;
97+ @ Getter
98+ @ Setter
99+ private RetentionService retentionService ;
100+ private Boolean _alwaysEmitAuditEvent = false ;
92101 public static final String DEFAULT_RUN_ID = "no-run-id-provided" ;
93102 public static final String BROWSE_PATHS = "browsePaths" ;
94103 public static final String DATA_PLATFORM_INSTANCE = "dataPlatformInstance" ;
@@ -107,7 +116,7 @@ protected EntityService(@Nonnull final EntityEventProducer producer, @Nonnull fi
107116 * @param aspectNames aspects to fetch for each urn in urns set
108117 * @return a map of provided {@link Urn} to a List containing the requested aspects.
109118 */
110- protected abstract Map <Urn , List <RecordTemplate >> getLatestAspects (@ Nonnull final Set <Urn > urns ,
119+ public abstract Map <Urn , List <RecordTemplate >> getLatestAspects (@ Nonnull final Set <Urn > urns ,
111120 @ Nonnull final Set <String > aspectNames );
112121
113122 /**
@@ -146,6 +155,22 @@ public abstract VersionedAspect getVersionedAspect(@Nonnull final Urn urn, @Nonn
146155 public abstract ListResult <RecordTemplate > listLatestAspects (@ Nonnull final String entityName ,
147156 @ Nonnull final String aspectName , final int start , int count );
148157
158+ /**
159+ * Checks whether there is an actual update to the aspect by applying the updateLambda
160+ * If there is an update, push the new version into the local DB.
161+ * Otherwise, do not push the new version, but just update the system metadata.
162+ *
163+ * @param urn an urn associated with the new aspect
164+ * @param aspectName name of the aspect being inserted
165+ * @param updateLambda Function to apply to the latest version of the aspect to get the updated version
166+ * @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time * @param providedSystemMetadata
167+ * @return Details about the new and old version of the aspect
168+ */
169+ @ Nonnull
170+ protected abstract UpdateAspectResult ingestAspectToLocalDB (@ Nonnull final Urn urn , @ Nonnull final String aspectName ,
171+ @ Nonnull final Function <Optional <RecordTemplate >, RecordTemplate > updateLambda ,
172+ @ Nonnull final AuditStamp auditStamp , @ Nonnull final SystemMetadata providedSystemMetadata );
173+
149174 /**
150175 * Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}.
151176 *
@@ -159,8 +184,47 @@ public abstract ListResult<RecordTemplate> listLatestAspects(@Nonnull final Stri
159184 * @param systemMetadata
160185 * @return the {@link RecordTemplate} representation of the written aspect object
161186 */
162- public abstract RecordTemplate ingestAspect (@ Nonnull final Urn urn , @ Nonnull final String aspectName ,
163- @ Nonnull final RecordTemplate newValue , @ Nonnull final AuditStamp auditStamp , SystemMetadata systemMetadata );
187+ public RecordTemplate ingestAspect (@ Nonnull final Urn urn , @ Nonnull final String aspectName ,
188+ @ Nonnull final RecordTemplate newValue , @ Nonnull final AuditStamp auditStamp , SystemMetadata systemMetadata ) {
189+
190+ log .debug ("Invoked ingestAspect with urn: {}, aspectName: {}, newValue: {}" , urn , aspectName , newValue );
191+
192+ if (!urn .toString ().trim ().equals (urn .toString ())) {
193+ throw new IllegalArgumentException ("Error: cannot provide an URN with leading or trailing whitespace" );
194+ }
195+
196+ Timer .Context ingestToLocalDBTimer = MetricUtils .timer (this .getClass (), "ingestAspectToLocalDB" ).time ();
197+ UpdateAspectResult result = ingestAspectToLocalDB (urn , aspectName , ignored -> newValue , auditStamp , systemMetadata );
198+ ingestToLocalDBTimer .stop ();
199+
200+ final RecordTemplate oldValue = result .getOldValue ();
201+ final RecordTemplate updatedValue = result .getNewValue ();
202+
203+ // Apply retention policies asynchronously if there was an update to existing aspect value
204+ if (oldValue != updatedValue && oldValue != null && retentionService != null ) {
205+ retentionService .applyRetention (urn , aspectName ,
206+ Optional .of (new RetentionService .RetentionContext (Optional .of (result .maxVersion ))));
207+ }
208+
209+ // Produce MAE after a successful update
210+ if (oldValue != updatedValue || _alwaysEmitAuditEvent ) {
211+ log .debug (String .format ("Producing MetadataAuditEvent for ingested aspect %s, urn %s" , aspectName , urn ));
212+ Timer .Context produceMAETimer = MetricUtils .timer (this .getClass (), "produceMAE" ).time ();
213+ if (aspectName .equals (getKeyAspectName (urn ))) {
214+ produceMetadataAuditEventForKey (urn , result .getNewSystemMetadata ());
215+ } else {
216+ produceMetadataAuditEvent (urn , oldValue , updatedValue , result .getOldSystemMetadata (),
217+ result .getNewSystemMetadata (), MetadataAuditOperation .UPDATE );
218+ }
219+ produceMAETimer .stop ();
220+ } else {
221+ log .debug (
222+ String .format ("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed." ,
223+ aspectName , urn ));
224+ }
225+
226+ return updatedValue ;
227+ }
164228
165229 public RecordTemplate ingestAspect (@ Nonnull final Urn urn , @ Nonnull final String aspectName ,
166230 @ Nonnull final RecordTemplate newValue , @ Nonnull final AuditStamp auditStamp ) {
@@ -171,6 +235,105 @@ public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String
171235 return ingestAspect (urn , aspectName , newValue , auditStamp , generatedSystemMetadata );
172236 }
173237
238+ public IngestProposalResult ingestProposal (@ Nonnull MetadataChangeProposal metadataChangeProposal , AuditStamp auditStamp ) {
239+
240+ log .debug ("entity type = {}" , metadataChangeProposal .getEntityType ());
241+ EntitySpec entitySpec = getEntityRegistry ().getEntitySpec (metadataChangeProposal .getEntityType ());
242+ log .debug ("entity spec = {}" , entitySpec );
243+
244+ Urn entityUrn = EntityKeyUtils .getUrnFromProposal (metadataChangeProposal , entitySpec .getKeyAspectSpec ());
245+
246+ if (metadataChangeProposal .getChangeType () != ChangeType .UPSERT ) {
247+ throw new UnsupportedOperationException ("Only upsert operation is supported" );
248+ }
249+
250+ if (!metadataChangeProposal .hasAspectName () || !metadataChangeProposal .hasAspect ()) {
251+ throw new UnsupportedOperationException ("Aspect and aspect name is required for create and update operations" );
252+ }
253+
254+ AspectSpec aspectSpec = entitySpec .getAspectSpec (metadataChangeProposal .getAspectName ());
255+
256+ if (aspectSpec == null ) {
257+ throw new RuntimeException (
258+ String .format ("Unknown aspect %s for entity %s" , metadataChangeProposal .getAspectName (),
259+ metadataChangeProposal .getEntityType ()));
260+ }
261+
262+ log .debug ("aspect spec = {}" , aspectSpec );
263+
264+ RecordTemplate aspect ;
265+ try {
266+ aspect = GenericAspectUtils .deserializeAspect (metadataChangeProposal .getAspect ().getValue (),
267+ metadataChangeProposal .getAspect ().getContentType (), aspectSpec );
268+ ValidationUtils .validateOrThrow (aspect );
269+ } catch (ModelConversionException e ) {
270+ throw new RuntimeException (
271+ String .format ("Could not deserialize {} for aspect {}" , metadataChangeProposal .getAspect ().getValue (),
272+ metadataChangeProposal .getAspectName ()));
273+ }
274+ log .debug ("aspect = {}" , aspect );
275+
276+ SystemMetadata systemMetadata = metadataChangeProposal .getSystemMetadata ();
277+ if (systemMetadata == null ) {
278+ systemMetadata = new SystemMetadata ();
279+ systemMetadata .setRunId (DEFAULT_RUN_ID );
280+ systemMetadata .setLastObserved (System .currentTimeMillis ());
281+ }
282+ systemMetadata .setRegistryName (aspectSpec .getRegistryName ());
283+ systemMetadata .setRegistryVersion (aspectSpec .getRegistryVersion ().toString ());
284+
285+ RecordTemplate oldAspect = null ;
286+ SystemMetadata oldSystemMetadata = null ;
287+ RecordTemplate newAspect = aspect ;
288+ SystemMetadata newSystemMetadata = systemMetadata ;
289+
290+ if (!aspectSpec .isTimeseries ()) {
291+ Timer .Context ingestToLocalDBTimer = MetricUtils .timer (this .getClass (), "ingestProposalToLocalDB" ).time ();
292+ UpdateAspectResult result =
293+ ingestAspectToLocalDB (entityUrn , metadataChangeProposal .getAspectName (), ignored -> aspect , auditStamp ,
294+ systemMetadata );
295+ ingestToLocalDBTimer .stop ();
296+ oldAspect = result .getOldValue ();
297+ oldSystemMetadata = result .getOldSystemMetadata ();
298+ newAspect = result .getNewValue ();
299+ newSystemMetadata = result .getNewSystemMetadata ();
300+ // Apply retention policies asynchronously if there was an update to existing aspect value
301+ if (oldAspect != newAspect && oldAspect != null && retentionService != null ) {
302+ retentionService .applyRetention (entityUrn , aspectSpec .getName (),
303+ Optional .of (new RetentionService .RetentionContext (Optional .of (result .maxVersion ))));
304+ }
305+ }
306+
307+ if (oldAspect != newAspect || getAlwaysEmitAuditEvent ()) {
308+ log .debug (String .format ("Producing MetadataChangeLog for ingested aspect %s, urn %s" ,
309+ metadataChangeProposal .getAspectName (), entityUrn ));
310+
311+ final MetadataChangeLog metadataChangeLog = new MetadataChangeLog (metadataChangeProposal .data ());
312+ if (oldAspect != null ) {
313+ metadataChangeLog .setPreviousAspectValue (GenericAspectUtils .serializeAspect (oldAspect ));
314+ }
315+ if (oldSystemMetadata != null ) {
316+ metadataChangeLog .setPreviousSystemMetadata (oldSystemMetadata );
317+ }
318+ if (newAspect != null ) {
319+ metadataChangeLog .setAspect (GenericAspectUtils .serializeAspect (newAspect ));
320+ }
321+ if (newSystemMetadata != null ) {
322+ metadataChangeLog .setSystemMetadata (newSystemMetadata );
323+ }
324+
325+ log .debug (String .format ("Serialized MCL event: %s" , metadataChangeLog ));
326+ // Since only timeseries aspects are ingested as of now, simply produce mae event for it
327+ produceMetadataChangeLog (entityUrn , aspectSpec , metadataChangeLog );
328+ } else {
329+ log .debug (
330+ String .format ("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed." ,
331+ metadataChangeProposal .getAspectName (), entityUrn ));
332+ }
333+
334+ return new IngestProposalResult (entityUrn , oldAspect != newAspect );
335+ }
336+
174337 /**
175338 * Updates a particular version of an aspect & optionally emits a {@link com.linkedin.mxe.MetadataAuditEvent}.
176339 *
@@ -483,12 +646,12 @@ private Map<String, Set<String>> buildEntityToValidAspects(final EntityRegistry
483646 entry -> entry .getAspectSpecs ().stream ().map (AspectSpec ::getName ).collect (Collectors .toSet ())));
484647 }
485648
486- public Boolean getEmitAspectSpecificAuditEvent () {
487- return _emitAspectSpecificAuditEvent ;
649+ public Boolean getAlwaysEmitAuditEvent () {
650+ return _alwaysEmitAuditEvent ;
488651 }
489652
490- public void setEmitAspectSpecificAuditEvent (Boolean emitAspectSpecificAuditEvent ) {
491- _emitAspectSpecificAuditEvent = emitAspectSpecificAuditEvent ;
653+ public void setAlwaysEmitAuditEvent (Boolean alwaysEmitAuditEvent ) {
654+ _alwaysEmitAuditEvent = alwaysEmitAuditEvent ;
492655 }
493656
494657 public EntityRegistry getEntityRegistry () {
@@ -505,8 +668,6 @@ protected Set<String> getEntityAspectNames(final String entityName) {
505668
506669 public abstract void setWritable (boolean canWrite );
507670
508- public abstract Urn ingestProposal (MetadataChangeProposal metadataChangeProposal , AuditStamp auditStamp );
509-
510671 public RollbackRunResult rollbackRun (List <AspectRowSummary > aspectRows , String runId ) {
511672 return rollbackWithConditions (aspectRows , Collections .singletonMap ("runId" , runId ));
512673 }
@@ -517,4 +678,21 @@ public abstract RollbackRunResult rollbackWithConditions(List<AspectRowSummary>
517678 public abstract RollbackRunResult deleteUrn (Urn urn );
518679
519680 public abstract Boolean exists (Urn urn );
681+
682+ @ Value
683+ public static class UpdateAspectResult {
684+ Urn urn ;
685+ RecordTemplate oldValue ;
686+ RecordTemplate newValue ;
687+ SystemMetadata oldSystemMetadata ;
688+ SystemMetadata newSystemMetadata ;
689+ MetadataAuditOperation operation ;
690+ long maxVersion ;
691+ }
692+
693+ @ Value
694+ public static class IngestProposalResult {
695+ Urn urn ;
696+ boolean didUpdate ;
697+ }
520698}
0 commit comments