a {@link JobRepository} (bean name "jobRepository" of type
* {@link org.springframework.batch.core.repository.support.ResourcelessJobRepository})
- *
a {@link JobRegistry} (bean name "jobRegistry" of type
- * {@link org.springframework.batch.core.configuration.support.MapJobRegistry})
*
a {@link org.springframework.batch.core.launch.JobOperator} (bean name
* "jobOperator" of type
* {@link org.springframework.batch.core.launch.support.TaskExecutorJobOperator})
@@ -134,7 +131,7 @@
*
*
*
+"org.springframework.batch.core.launch.support.JobOperatorFactoryBean">
*
*
*
@@ -174,6 +171,13 @@
*/
String taskExecutorRef() default "taskExecutor";
+ /**
+ * Set the job registry to use in the job operator.
+ * @return the bean name of the job registry to use. Defaults to
+ * {@literal jobRegistry}
+ */
+ String jobRegistryRef() default "jobRegistry";
+
/**
* Set the transaction manager to use in the job operator.
* @return the bean name of the transaction manager to use. Defaults to
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/DefaultBatchConfiguration.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/DefaultBatchConfiguration.java
index d1b6dc9cd6..c208b06715 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/DefaultBatchConfiguration.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/DefaultBatchConfiguration.java
@@ -15,9 +15,8 @@
*/
package org.springframework.batch.core.configuration.support;
-import org.springframework.batch.core.job.DefaultJobKeyGenerator;
-import org.springframework.batch.core.job.JobInstance;
-import org.springframework.batch.core.job.JobKeyGenerator;
+import org.springframework.batch.core.configuration.DuplicateJobException;
+import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.configuration.BatchConfigurationException;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
@@ -37,7 +36,6 @@
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.annotation.Isolation;
/**
* Base {@link Configuration} class that provides common infrastructure beans for enabling
@@ -48,7 +46,6 @@
*
*
*
a {@link ResourcelessJobRepository} named "jobRepository"
- *
a {@link MapJobRegistry} named "jobRegistry"
*
a {@link TaskExecutorJobOperator} named "jobOperator"
*
a {@link org.springframework.batch.core.scope.StepScope} named "stepScope"
*
a {@link org.springframework.batch.core.scope.JobScope} named "jobScope"
@@ -93,16 +90,10 @@ public JobRepository jobRepository() {
}
@Bean
- public JobRegistry jobRegistry() {
- return new MapJobRegistry();
- }
-
- @Bean
- public JobOperator jobOperator(JobRepository jobRepository, JobRegistry jobRegistry)
- throws BatchConfigurationException {
+ public JobOperator jobOperator(JobRepository jobRepository) throws BatchConfigurationException {
JobOperatorFactoryBean jobOperatorFactoryBean = new JobOperatorFactoryBean();
jobOperatorFactoryBean.setJobRepository(jobRepository);
- jobOperatorFactoryBean.setJobRegistry(jobRegistry);
+ jobOperatorFactoryBean.setJobRegistry(getJobRegistry());
jobOperatorFactoryBean.setTransactionManager(getTransactionManager());
jobOperatorFactoryBean.setJobParametersConverter(getJobParametersConverter());
jobOperatorFactoryBean.setTaskExecutor(getTaskExecutor());
@@ -115,6 +106,19 @@ public JobOperator jobOperator(JobRepository jobRepository, JobRegistry jobRegis
}
}
+ protected JobRegistry getJobRegistry() {
+ MapJobRegistry jobRegistry = new MapJobRegistry();
+ this.applicationContext.getBeansOfType(Job.class).values().forEach(job -> {
+ try {
+ jobRegistry.register(job);
+ }
+ catch (DuplicateJobException e) {
+ throw new BatchConfigurationException(e);
+ }
+ });
+ return jobRegistry;
+ }
+
/**
* Return the transaction manager to use for the job operator. Defaults to
* {@link ResourcelessTransactionManager}.
@@ -145,33 +149,4 @@ protected JobParametersConverter getJobParametersConverter() {
return new DefaultJobParametersConverter();
}
- /**
- * Return the value of the {@code validateTransactionState} parameter. Defaults to
- * {@code true}.
- * @return true if the transaction state should be validated, false otherwise
- */
- protected boolean getValidateTransactionState() {
- return true;
- }
-
- /**
- * Return the transaction isolation level when creating job executions. Defaults to
- * {@link Isolation#SERIALIZABLE}.
- * @return the transaction isolation level when creating job executions
- */
- protected Isolation getIsolationLevelForCreate() {
- return Isolation.SERIALIZABLE;
- }
-
- /**
- * A custom implementation of the {@link JobKeyGenerator}. The default, if not
- * injected, is the {@link DefaultJobKeyGenerator}.
- * @return the generator that creates the key used in identifying {@link JobInstance}
- * objects
- * @since 5.1
- */
- protected JobKeyGenerator getJobKeyGenerator() {
- return new DefaultJobKeyGenerator();
- }
-
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/JdbcDefaultBatchConfiguration.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/JdbcDefaultBatchConfiguration.java
index 172cb98809..2b3cc40e15 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/JdbcDefaultBatchConfiguration.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/JdbcDefaultBatchConfiguration.java
@@ -16,7 +16,6 @@
package org.springframework.batch.core.configuration.support;
import org.springframework.batch.core.configuration.BatchConfigurationException;
-import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.converter.DateToStringConverter;
import org.springframework.batch.core.converter.LocalDateTimeToStringConverter;
import org.springframework.batch.core.converter.LocalDateToStringConverter;
@@ -25,6 +24,9 @@
import org.springframework.batch.core.converter.StringToLocalDateConverter;
import org.springframework.batch.core.converter.StringToLocalDateTimeConverter;
import org.springframework.batch.core.converter.StringToLocalTimeConverter;
+import org.springframework.batch.core.job.DefaultJobKeyGenerator;
+import org.springframework.batch.core.job.JobInstance;
+import org.springframework.batch.core.job.JobKeyGenerator;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.batch.core.repository.JobRepository;
@@ -46,6 +48,7 @@
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.annotation.Isolation;
import javax.sql.DataSource;
import java.nio.charset.Charset;
@@ -61,7 +64,6 @@
*
*
*
a {@link JobRepository} named "jobRepository"
- *
a {@link JobRegistry} named "jobRegistry"
*
a {@link JobOperator} named "jobOperator"
*
a {@link org.springframework.batch.core.scope.StepScope} named "stepScope"
*
a {@link org.springframework.batch.core.scope.JobScope} named "jobScope"
@@ -257,4 +259,33 @@ protected ConfigurableConversionService getConversionService() {
return conversionService;
}
+ /**
+ * Return the value of the {@code validateTransactionState} parameter. Defaults to
+ * {@code true}.
+ * @return true if the transaction state should be validated, false otherwise
+ */
+ protected boolean getValidateTransactionState() {
+ return true;
+ }
+
+ /**
+ * Return the transaction isolation level when creating job executions. Defaults to
+ * {@link Isolation#SERIALIZABLE}.
+ * @return the transaction isolation level when creating job executions
+ */
+ protected Isolation getIsolationLevelForCreate() {
+ return Isolation.SERIALIZABLE;
+ }
+
+ /**
+ * A custom implementation of the {@link JobKeyGenerator}. The default, if not
+ * injected, is the {@link DefaultJobKeyGenerator}.
+ * @return the generator that creates the key used in identifying {@link JobInstance}
+ * objects
+ * @since 5.1
+ */
+ protected JobKeyGenerator getJobKeyGenerator() {
+ return new DefaultJobKeyGenerator();
+ }
+
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java
index 7f5cbb25e2..fdb18c63d3 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/configuration/support/MongoDefaultBatchConfiguration.java
@@ -16,7 +16,9 @@
package org.springframework.batch.core.configuration.support;
import org.springframework.batch.core.configuration.BatchConfigurationException;
-import org.springframework.batch.core.configuration.JobRegistry;
+import org.springframework.batch.core.job.DefaultJobKeyGenerator;
+import org.springframework.batch.core.job.JobInstance;
+import org.springframework.batch.core.job.JobKeyGenerator;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MongoJobRepositoryFactoryBean;
@@ -24,6 +26,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.core.MongoOperations;
+import org.springframework.transaction.annotation.Isolation;
/**
* Base {@link Configuration} class that provides common MongoDB-based infrastructure
@@ -34,7 +37,6 @@
*
*
*
a {@link JobRepository} named "jobRepository"
- *
a {@link JobRegistry} named "jobRegistry"
*
a {@link JobOperator} named "jobOperator"
*
a {@link org.springframework.batch.core.scope.StepScope} named "stepScope"
*
a {@link org.springframework.batch.core.scope.JobScope} named "jobScope"
@@ -117,4 +119,33 @@ protected MongoTransactionManager getTransactionManager() {
return this.applicationContext.getBean("transactionManager", MongoTransactionManager.class);
}
+ /**
+ * Return the value of the {@code validateTransactionState} parameter. Defaults to
+ * {@code true}.
+ * @return true if the transaction state should be validated, false otherwise
+ */
+ protected boolean getValidateTransactionState() {
+ return true;
+ }
+
+ /**
+ * Return the transaction isolation level when creating job executions. Defaults to
+ * {@link Isolation#SERIALIZABLE}.
+ * @return the transaction isolation level when creating job executions
+ */
+ protected Isolation getIsolationLevelForCreate() {
+ return Isolation.SERIALIZABLE;
+ }
+
+ /**
+ * A custom implementation of the {@link JobKeyGenerator}. The default, if not
+ * injected, is the {@link DefaultJobKeyGenerator}.
+ * @return the generator that creates the key used in identifying {@link JobInstance}
+ * objects
+ * @since 5.1
+ */
+ protected JobKeyGenerator getJobKeyGenerator() {
+ return new DefaultJobKeyGenerator();
+ }
+
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java
index 7c60e2da7a..9ca23315f4 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/AbstractJob.java
@@ -37,6 +37,7 @@
import org.springframework.batch.core.job.parameters.JobParametersValidator;
import org.springframework.batch.core.listener.JobExecutionListener;
import org.springframework.batch.core.SpringBatchVersion;
+import org.springframework.batch.core.observability.jfr.events.job.JobExecutionEvent;
import org.springframework.batch.core.step.Step;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.launch.NoSuchJobException;
@@ -277,6 +278,9 @@ public final void execute(JobExecution execution) {
}
JobSynchronizationManager.register(execution);
+ JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(execution.getJobInstance().getJobName(),
+ execution.getJobInstance().getId(), execution.getId());
+ jobExecutionEvent.begin();
String activeJobMeterName = "job.active";
LongTaskTimer longTaskTimer = BatchMetrics.createLongTaskTimer(this.meterRegistry, activeJobMeterName,
"Active jobs", Tag.of(BatchMetrics.METRICS_PREFIX + activeJobMeterName + ".name",
@@ -349,6 +353,8 @@ public final void execute(JobExecution execution) {
execution.setExitStatus(exitStatus.and(newExitStatus));
}
stopObservation(execution, observation);
+ jobExecutionEvent.exitStatus = execution.getExitStatus().getExitCode();
+ jobExecutionEvent.commit();
longTaskTimerSample.stop();
execution.setEndTime(LocalDateTime.now());
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/job/Job.java b/spring-batch-core/src/main/java/org/springframework/batch/core/job/Job.java
index 80fdd5583b..22c91a5651 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/job/Job.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/job/Job.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2006-2022 the original author or authors.
+ * Copyright 2006-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,9 +28,18 @@
* @author Dave Syer
* @author Mahmoud Ben Hassine
*/
+@FunctionalInterface
public interface Job {
- String getName();
+ /**
+ * The name of the job. This is used to distinguish between different jobs and must be
+ * unique within the job repository. If not explicitly set, the name will default to
+ * the fully qualified class name.
+ * @return the name of the job (never {@code null})
+ */
+ default String getName() {
+ return this.getClass().getName();
+ }
/**
* Flag to indicate if this job can be restarted, at least in principle.
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/JobOperatorFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/JobOperatorFactoryBean.java
index 648e0ac85b..82c50dd82d 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/JobOperatorFactoryBean.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/JobOperatorFactoryBean.java
@@ -22,14 +22,22 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory;
+import org.springframework.batch.core.configuration.BatchConfigurationException;
+import org.springframework.batch.core.configuration.DuplicateJobException;
import org.springframework.batch.core.configuration.JobRegistry;
+import org.springframework.batch.core.configuration.support.MapJobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
import org.springframework.batch.core.converter.JobParametersConverter;
+import org.springframework.batch.core.job.Job;
import org.springframework.batch.core.job.JobExecution;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
+import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
+import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@@ -49,10 +57,12 @@
* @author Mahmoud Ben Hassine
* @since 5.0
*/
-public class JobOperatorFactoryBean implements FactoryBean, InitializingBean {
+public class JobOperatorFactoryBean implements FactoryBean, ApplicationContextAware, InitializingBean {
protected static final Log logger = LogFactory.getLog(JobOperatorFactoryBean.class);
+ private ApplicationContext applicationContext;
+
private PlatformTransactionManager transactionManager;
private TransactionAttributeSource transactionAttributeSource;
@@ -69,30 +79,44 @@ public class JobOperatorFactoryBean implements FactoryBean, Initial
private final ProxyFactory proxyFactory = new ProxyFactory();
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(this.jobRepository, "JobRepository must not be null");
- Assert.notNull(this.jobRegistry, "JobRegistry must not be null");
- Assert.notNull(this.transactionManager, "TransactionManager must not be null");
+ if (this.jobRegistry == null) {
+ this.jobRegistry = new MapJobRegistry();
+ populateJobRegistry();
+ logger.info(
+ "No JobRegistry has been set, defaulting to a MapJobRegistry populated with jobs defined in the application context.");
+ }
+ if (this.transactionManager == null) {
+ this.transactionManager = new ResourcelessTransactionManager();
+ logger.info("No transaction manager has been set, defaulting to ResourcelessTransactionManager.");
+ }
if (this.taskExecutor == null) {
logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
this.taskExecutor = new SyncTaskExecutor();
}
if (this.transactionAttributeSource == null) {
- this.transactionAttributeSource = new MethodMapTransactionAttributeSource();
- DefaultTransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
- Method stopMethod = TaskExecutorJobOperator.class.getMethod("stop", JobExecution.class);
- Method abandonMethod = TaskExecutorJobOperator.class.getMethod("abandon", JobExecution.class);
- Method recoverMethod = TaskExecutorJobOperator.class.getMethod("recover", JobExecution.class);
- ((MethodMapTransactionAttributeSource) this.transactionAttributeSource).addTransactionalMethod(stopMethod,
- transactionAttribute);
- ((MethodMapTransactionAttributeSource) this.transactionAttributeSource)
- .addTransactionalMethod(abandonMethod, transactionAttribute);
- ((MethodMapTransactionAttributeSource) this.transactionAttributeSource)
- .addTransactionalMethod(recoverMethod, transactionAttribute);
+ this.transactionAttributeSource = new DefaultJobOperatorTransactionAttributeSource();
}
}
+ private void populateJobRegistry() {
+ this.applicationContext.getBeansOfType(Job.class).values().forEach(job -> {
+ try {
+ jobRegistry.register(job);
+ }
+ catch (DuplicateJobException e) {
+ throw new BatchConfigurationException(e);
+ }
+ });
+ }
+
/**
* Setter for the job registry.
* @param jobRegistry the job registry to set
@@ -189,4 +213,24 @@ private TaskExecutorJobOperator getTarget() throws Exception {
return taskExecutorJobOperator;
}
+ private static class DefaultJobOperatorTransactionAttributeSource extends MethodMapTransactionAttributeSource {
+
+ public DefaultJobOperatorTransactionAttributeSource() {
+ DefaultTransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
+ try {
+ Method stopMethod = TaskExecutorJobOperator.class.getMethod("stop", JobExecution.class);
+ Method abandonMethod = TaskExecutorJobOperator.class.getMethod("abandon", JobExecution.class);
+ Method recoverMethod = TaskExecutorJobOperator.class.getMethod("recover", JobExecution.class);
+ addTransactionalMethod(stopMethod, transactionAttribute);
+ addTransactionalMethod(abandonMethod, transactionAttribute);
+ addTransactionalMethod(recoverMethod, transactionAttribute);
+ }
+ catch (NoSuchMethodException e) {
+ throw new IllegalStateException("Failed to initialize default transaction attributes for JobOperator",
+ e);
+ }
+ }
+
+ }
+
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java
index 047c5dd6d5..db0c1a351a 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/SimpleJobOperator.java
@@ -36,6 +36,7 @@
import org.springframework.batch.core.job.parameters.JobParametersIncrementer;
import org.springframework.batch.core.job.parameters.JobParametersInvalidException;
import org.springframework.batch.core.step.Step;
+import org.springframework.batch.core.step.StoppableStep;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.job.UnexpectedJobExecutionException;
import org.springframework.batch.core.configuration.JobRegistry;
@@ -81,6 +82,7 @@
* @author Mahmoud Ben Hassine
* @author Andrey Litvitski
* @author Yejeong Ham
+ * @author Hyunsang Han
* @since 2.0
* @deprecated since 6.0 in favor of {@link TaskExecutorJobOperator}. Scheduled for
* removal in 6.2 or later.
@@ -347,10 +349,15 @@ public boolean stop(JobExecution jobExecution) throws JobExecutionNotRunningExce
Tasklet tasklet = taskletStep.getTasklet();
if (tasklet instanceof StoppableTasklet stoppableTasklet) {
StepSynchronizationManager.register(stepExecution);
- stoppableTasklet.stop();
+ stoppableTasklet.stop(stepExecution);
StepSynchronizationManager.release();
}
}
+ if (step instanceof StoppableStep stoppableStep) {
+ StepSynchronizationManager.register(stepExecution);
+ stoppableStep.stop(stepExecution);
+ StepSynchronizationManager.release();
+ }
}
catch (NoSuchStepException e) {
logger.warn("Step not found", e);
@@ -360,7 +367,9 @@ public boolean stop(JobExecution jobExecution) throws JobExecutionNotRunningExce
}
}
catch (NoSuchJobException e) {
- logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called", e);
+ logger.warn(
+ "Cannot find Job object in the job registry. StoppableTasklet#stop(StepExecution stepExecution) will not be called",
+ e);
}
return true;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/TaskExecutorJobOperator.java b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/TaskExecutorJobOperator.java
index 6434a22cfd..48bb126b5b 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/TaskExecutorJobOperator.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/launch/support/TaskExecutorJobOperator.java
@@ -27,6 +27,7 @@
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
+import org.springframework.batch.core.observability.jfr.events.job.JobLaunchEvent;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
@@ -92,6 +93,7 @@ public JobExecution start(Job job, JobParameters jobParameters)
JobRestartException, JobParametersInvalidException {
Assert.notNull(job, "Job must not be null");
Assert.notNull(jobParameters, "JobParameters must not be null");
+ new JobLaunchEvent(job.getName(), jobParameters.toString()).commit();
return super.start(job, jobParameters);
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/listener/ChunkListener.java b/spring-batch-core/src/main/java/org/springframework/batch/core/listener/ChunkListener.java
index 01a4c721f3..45b91756db 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/listener/ChunkListener.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/listener/ChunkListener.java
@@ -24,6 +24,9 @@
*
* {@link ChunkListener} shouldn't throw exceptions and expect continued processing, they
* must be handled in the implementation or the step will terminate.
+ *
+ * Note: This listener is not called in concurrent steps.
+ *
*
* @author Lucas Ward
* @author Michael Minella
@@ -78,14 +81,16 @@ default void afterChunkError(ChunkContext context) {
}
/**
- * Callback before the chunk is processed, inside the transaction.
+ * Callback before the chunk is processed, inside the transaction. This method is not
+ * called in concurrent steps.
* @since 6.0
*/
default void beforeChunk(Chunk chunk) {
}
/**
- * Callback after the chunk is written, inside the transaction.
+ * Callback after the chunk is written, inside the transaction. This method is not
+ * called in concurrent steps.
* @since 6.0
*/
default void afterChunk(Chunk chunk) {
@@ -95,7 +100,7 @@ default void afterChunk(Chunk chunk) {
* Callback if an exception occurs while processing or writing a chunk, inside the
* transaction, which is about to be rolled back. As a result, you should use
* {@code PROPAGATION_REQUIRES_NEW} for any transactional operation that is called
- * here.
+ * here. This method is not called in concurrent steps.
* @param exception the exception that caused the underlying rollback.
* @param chunk the processed chunk
* @since 6.0
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/listener/StepListenerMetaData.java b/spring-batch-core/src/main/java/org/springframework/batch/core/listener/StepListenerMetaData.java
index 7ceff8a96f..4fb5aad1b1 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/listener/StepListenerMetaData.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/listener/StepListenerMetaData.java
@@ -46,6 +46,7 @@
* methods, their interfaces, annotation, and expected arguments.
*
* @author Lucas Ward
+ * @author Hyunsang Han
* @since 2.0
* @see StepListenerFactoryBean
*/
@@ -53,8 +54,8 @@ public enum StepListenerMetaData implements ListenerMetaData {
BEFORE_STEP("beforeStep", "before-step-method", BeforeStep.class, StepExecutionListener.class, StepExecution.class),
AFTER_STEP("afterStep", "after-step-method", AfterStep.class, StepExecutionListener.class, StepExecution.class),
- BEFORE_CHUNK("beforeChunk", "before-chunk-method", BeforeChunk.class, ChunkListener.class, ChunkContext.class),
- AFTER_CHUNK("afterChunk", "after-chunk-method", AfterChunk.class, ChunkListener.class, ChunkContext.class),
+ BEFORE_CHUNK("beforeChunk", "before-chunk-method", BeforeChunk.class, ChunkListener.class, Chunk.class),
+ AFTER_CHUNK("afterChunk", "after-chunk-method", AfterChunk.class, ChunkListener.class, Chunk.class),
AFTER_CHUNK_ERROR("afterChunkError", "after-chunk-error-method", AfterChunkError.class, ChunkListener.class,
ChunkContext.class),
BEFORE_READ("beforeRead", "before-read-method", BeforeRead.class, ItemReadListener.class),
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java
index 7e5a9e7595..6884dac28a 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/BatchMetrics.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2019-2023 the original author or authors.
+ * Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -55,6 +55,10 @@ public final class BatchMetrics {
public static final String STATUS_FAILURE = "FAILURE";
+ public static final String STATUS_COMMITTED = "COMMITTED";
+
+ public static final String STATUS_ROLLED_BACK = "ROLLED_BACK";
+
private BatchMetrics() {
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/job/JobExecutionEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/job/JobExecutionEvent.java
new file mode 100644
index 0000000000..416c524444
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/job/JobExecutionEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.job;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Job Execution")
+@Description("Job Execution Event")
+@Category({ "Spring Batch", "Job" })
+public class JobExecutionEvent extends Event {
+
+ @Label("Job Name")
+ public String jobName;
+
+ @Label("Job Instance Id")
+ public long jobInstanceId;
+
+ @Label("Job Execution Id")
+ public long jobExecutionId;
+
+ @Label("Job Exit Status")
+ public String exitStatus;
+
+ public JobExecutionEvent(String jobName, long jobInstanceId, long jobExecutionId) {
+ this.jobName = jobName;
+ this.jobInstanceId = jobInstanceId;
+ this.jobExecutionId = jobExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/job/JobLaunchEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/job/JobLaunchEvent.java
new file mode 100644
index 0000000000..269d099196
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/job/JobLaunchEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.job;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Job Launch Request")
+@Description("Job Launch Request Event")
+@Category({ "Spring Batch", "Job" })
+public class JobLaunchEvent extends Event {
+
+ @Label("Job Name")
+ public String jobName;
+
+ @Label("Job Parameters")
+ public String jobParameters;
+
+ public JobLaunchEvent(String jobName, String jobParameters) {
+ this.jobParameters = jobParameters;
+ this.jobName = jobName;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/StepExecutionEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/StepExecutionEvent.java
new file mode 100644
index 0000000000..6e7784fc79
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/StepExecutionEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Step Execution")
+@Description("Step Execution Event")
+@Category({ "Spring Batch", "Step" })
+public class StepExecutionEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Job Name")
+ public String jobName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Job Execution Id")
+ public long jobExecutionId;
+
+ @Label("Step Exit Status")
+ public String exitStatus;
+
+ public StepExecutionEvent(String stepName, String jobName, long stepExecutionId, long jobExecutionId) {
+ this.stepName = stepName;
+ this.jobName = jobName;
+ this.stepExecutionId = stepExecutionId;
+ this.jobExecutionId = jobExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkScanEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkScanEvent.java
new file mode 100644
index 0000000000..d5d11e7d1b
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkScanEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.chunk;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Chunk Scan")
+@Description("Chunk Scan Event")
+@Category({ "Spring Batch", "Step", "Chunk" })
+public class ChunkScanEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Skip Count")
+ public long skipCount;
+
+ public ChunkScanEvent(String stepName, long stepExecutionId) {
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkTransactionEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkTransactionEvent.java
new file mode 100644
index 0000000000..695f3afcfa
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkTransactionEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.chunk;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Chunk Transaction")
+@Description("Chunk Transaction Event")
+@Category({ "Spring Batch", "Step", "Chunk" })
+public class ChunkTransactionEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Transaction Status")
+ public String transactionStatus;
+
+ public ChunkTransactionEvent(String stepName, long stepExecutionId) {
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkWriteEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkWriteEvent.java
new file mode 100644
index 0000000000..6139abb60b
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ChunkWriteEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.chunk;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Chunk Write")
+@Description("Chunk Write Event")
+@Category({ "Spring Batch", "Step", "Chunk" })
+public class ChunkWriteEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Chunk Write Status")
+ public String chunkWriteStatus;
+
+ @Label("Item Count")
+ public long itemCount;
+
+ public ChunkWriteEvent(String stepName, long stepExecutionId, long itemCount) {
+ this.itemCount = itemCount;
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ItemProcessEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ItemProcessEvent.java
new file mode 100644
index 0000000000..358794dcff
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ItemProcessEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.chunk;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Item Process")
+@Description("Item Process Event")
+@Category({ "Spring Batch", "Step", "Chunk" })
+public class ItemProcessEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Item Process Status")
+ public String itemProcessStatus;
+
+ public ItemProcessEvent(String stepName, long stepExecutionId) {
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ItemReadEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ItemReadEvent.java
new file mode 100644
index 0000000000..5e55c0de3d
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/chunk/ItemReadEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.chunk;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Item Read")
+@Description("Item Read Event")
+@Category({ "Spring Batch", "Step", "Chunk" })
+public class ItemReadEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Item Read Status")
+ public String itemReadStatus;
+
+ public ItemReadEvent(String stepName, long stepExecutionId) {
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/partition/PartitionAggregateEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/partition/PartitionAggregateEvent.java
new file mode 100644
index 0000000000..f516b08de1
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/partition/PartitionAggregateEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.partition;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Partition Aggregate")
+@Description("Partition Aggregate Event")
+@Category({ "Spring Batch", "Step", "Partition" })
+public class PartitionAggregateEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ public PartitionAggregateEvent(String stepName, long stepExecutionId) {
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/partition/PartitionSplitEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/partition/PartitionSplitEvent.java
new file mode 100644
index 0000000000..26504edc2f
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/partition/PartitionSplitEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.partition;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Partition Split")
+@Description("Partition Split Event")
+@Category({ "Spring Batch", "Step", "Partition" })
+public class PartitionSplitEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Partition count")
+ public long partitionCount;
+
+ public PartitionSplitEvent(String stepName, long stepExecutionId) {
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/tasklet/TaskletExecutionEvent.java b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/tasklet/TaskletExecutionEvent.java
new file mode 100644
index 0000000000..97fbb7b6d7
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/observability/jfr/events/step/tasklet/TaskletExecutionEvent.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.observability.jfr.events.step.tasklet;
+
+import jdk.jfr.Category;
+import jdk.jfr.Description;
+import jdk.jfr.Event;
+import jdk.jfr.Label;
+
+@Label("Tasklet Execution")
+@Description("Tasklet Execution Event")
+@Category({ "Spring Batch", "Step", "Tasklet" })
+public class TaskletExecutionEvent extends Event {
+
+ @Label("Step Name")
+ public String stepName;
+
+ @Label("Step Execution Id")
+ public long stepExecutionId;
+
+ @Label("Tasklet Type")
+ public String taskletType;
+
+ @Label("Tasklet Status")
+ public String taskletStatus;
+
+ public TaskletExecutionEvent(String stepName, long stepExecutionId, String taskletType) {
+ this.taskletType = taskletType;
+ this.stepName = stepName;
+ this.stepExecutionId = stepExecutionId;
+ }
+
+}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/PartitionStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/PartitionStep.java
index 104d48e995..5600cab5f1 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/partition/PartitionStep.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/partition/PartitionStep.java
@@ -18,6 +18,8 @@
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.job.JobExecutionException;
+import org.springframework.batch.core.observability.jfr.events.step.partition.PartitionAggregateEvent;
+import org.springframework.batch.core.observability.jfr.events.step.partition.PartitionSplitEvent;
import org.springframework.batch.core.step.Step;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.partition.support.DefaultStepExecutionAggregator;
@@ -97,10 +99,21 @@ public void afterPropertiesSet() throws Exception {
protected void doExecute(StepExecution stepExecution) throws Exception {
stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
- // Wait for task completion and then aggregate the results
+ // Split execution into partitions and wait for task completion
+ PartitionSplitEvent partitionSplitEvent = new PartitionSplitEvent(stepExecution.getStepName(),
+ stepExecution.getId());
+ partitionSplitEvent.begin();
Collection executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
+ partitionSplitEvent.partitionCount = executions.size();
stepExecution.upgradeStatus(BatchStatus.COMPLETED);
+ partitionSplitEvent.commit();
+
+ // aggregate the results of the executions
+ PartitionAggregateEvent partitionAggregateEvent = new PartitionAggregateEvent(stepExecution.getStepName(),
+ stepExecution.getId());
+ partitionAggregateEvent.begin();
stepExecutionAggregator.aggregate(stepExecution, executions);
+ partitionAggregateEvent.commit();
// If anything failed or had a problem we need to crap out
if (stepExecution.getStatus().isUnsuccessful()) {
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/AbstractJobRepositoryFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/AbstractJobRepositoryFactoryBean.java
index 1d304dba63..50a5ef4186 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/AbstractJobRepositoryFactoryBean.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/AbstractJobRepositoryFactoryBean.java
@@ -196,15 +196,8 @@ public void afterPropertiesSet() throws Exception {
jobKeyGenerator = new DefaultJobKeyGenerator();
}
if (this.transactionAttributeSource == null) {
- Properties transactionAttributes = new Properties();
- transactionAttributes.setProperty("create*",
- TRANSACTION_PROPAGATION_PREFIX + Propagation.REQUIRES_NEW + "," + this.isolationLevelForCreate);
- transactionAttributes.setProperty("getLastJobExecution*",
- TRANSACTION_PROPAGATION_PREFIX + Propagation.REQUIRES_NEW + "," + this.isolationLevelForCreate);
- transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
- this.transactionAttributeSource = new NameMatchTransactionAttributeSource();
- ((NameMatchTransactionAttributeSource) this.transactionAttributeSource)
- .setProperties(transactionAttributes);
+ this.transactionAttributeSource = new DefaultJobRepositoryTransactionAttributeSource(
+ this.isolationLevelForCreate);
}
}
@@ -237,4 +230,18 @@ private Object getTarget() throws Exception {
createExecutionContextDao());
}
+ private static class DefaultJobRepositoryTransactionAttributeSource extends NameMatchTransactionAttributeSource {
+
+ public DefaultJobRepositoryTransactionAttributeSource(String isolationLevelForCreate) {
+ Properties transactionAttributes = new Properties();
+ transactionAttributes.setProperty("create*",
+ TRANSACTION_PROPAGATION_PREFIX + Propagation.REQUIRES_NEW + "," + isolationLevelForCreate);
+ transactionAttributes.setProperty("getLastJobExecution*",
+ TRANSACTION_PROPAGATION_PREFIX + Propagation.REQUIRES_NEW + "," + isolationLevelForCreate);
+ transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
+ this.setProperties(transactionAttributes);
+ }
+
+ }
+
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/ResourcelessJobRepository.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/ResourcelessJobRepository.java
index 71347c3ca9..694cd471a7 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/ResourcelessJobRepository.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/support/ResourcelessJobRepository.java
@@ -16,7 +16,6 @@
package org.springframework.batch.core.repository.support;
import java.time.LocalDateTime;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -27,6 +26,7 @@
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
+import org.springframework.lang.Nullable;
/**
* A {@link JobRepository} implementation that does not use or store batch meta-data. It
@@ -50,6 +50,14 @@ public class ResourcelessJobRepository implements JobRepository {
private JobExecution jobExecution;
+ private long stepExecutionIdIncrementer = 0L;
+
+ /*
+ * ===================================================================================
+ * Job operations
+ * ===================================================================================
+ */
+
@Override
public List getJobNames() {
if (this.jobInstance == null) {
@@ -58,8 +66,41 @@ public List getJobNames() {
return Collections.singletonList(this.jobInstance.getJobName());
}
+ /*
+ * ===================================================================================
+ * Job instance operations
+ * ===================================================================================
+ */
+
+ @Override
+ public List getJobInstances(String jobName, int start, int count) {
+ if (this.jobInstance == null) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(this.jobInstance);
+ }
+
+ @Override
+ @Nullable
+ public JobInstance getJobInstance(@Nullable Long instanceId) {
+ return this.jobInstance;
+ }
+
+ @Override
+ @Nullable
+ public JobInstance getLastJobInstance(String jobName) {
+ return this.jobInstance;
+ }
+
+ @Override
+ @Nullable
+ public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
+ return this.jobInstance;
+ }
+
@SuppressWarnings("removal")
@Override
+ @Deprecated(since = "6.0", forRemoval = true)
public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
return false;
}
@@ -75,42 +116,51 @@ public JobInstance createJobInstance(String jobName, JobParameters jobParameters
return this.jobInstance;
}
+ /*
+ * ===================================================================================
+ * Job execution operations
+ * ===================================================================================
+ */
+
@Override
- public JobExecution createJobExecution(String jobName, JobParameters jobParameters) {
- if (this.jobInstance == null) {
- createJobInstance(jobName, jobParameters);
- }
- this.jobExecution = new JobExecution(this.jobInstance, 1L, jobParameters);
+ @Nullable
+ public JobExecution getJobExecution(Long executionId) {
return this.jobExecution;
}
@Override
- public void update(JobExecution jobExecution) {
- jobExecution.setLastUpdated(LocalDateTime.now());
- this.jobExecution = jobExecution;
+ @Nullable
+ public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
+ return this.jobExecution;
}
@Override
- public void add(StepExecution stepExecution) {
- this.addAll(Collections.singletonList(stepExecution));
+ @Nullable
+ public JobExecution getLastJobExecution(JobInstance jobInstance) {
+ return this.jobExecution;
}
@Override
- public void addAll(Collection stepExecutions) {
- this.jobExecution.addStepExecutions(new ArrayList<>(stepExecutions));
+ public List getJobExecutions(JobInstance jobInstance) {
+ if (this.jobExecution == null) {
+ return Collections.emptyList();
+ }
+ return Collections.singletonList(this.jobExecution);
}
@Override
- public void update(StepExecution stepExecution) {
- stepExecution.setLastUpdated(LocalDateTime.now());
- if (this.jobExecution.isStopping()) {
- stepExecution.setTerminateOnly();
+ public JobExecution createJobExecution(String jobName, JobParameters jobParameters) {
+ if (this.jobInstance == null) {
+ createJobInstance(jobName, jobParameters);
}
+ this.jobExecution = new JobExecution(this.jobInstance, 1L, jobParameters);
+ return this.jobExecution;
}
@Override
- public void updateExecutionContext(StepExecution stepExecution) {
- stepExecution.setLastUpdated(LocalDateTime.now());
+ public void update(JobExecution jobExecution) {
+ jobExecution.setLastUpdated(LocalDateTime.now());
+ this.jobExecution = jobExecution;
}
@Override
@@ -118,8 +168,31 @@ public void updateExecutionContext(JobExecution jobExecution) {
jobExecution.setLastUpdated(LocalDateTime.now());
}
+ /*
+ * ===================================================================================
+ * Step execution operations
+ * ===================================================================================
+ */
+
@Override
+ @Nullable
+ public StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId) {
+ if (this.jobExecution == null || !this.jobExecution.getId().equals(jobExecutionId)) {
+ return null;
+ }
+ return this.jobExecution.getStepExecutions()
+ .stream()
+ .filter(stepExecution -> stepExecution.getId().equals(stepExecutionId))
+ .findFirst()
+ .orElse(null);
+ }
+
+ @Override
+ @Nullable
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
+ if (this.jobExecution == null || !this.jobExecution.getJobInstance().getId().equals(jobInstance.getId())) {
+ return null;
+ }
return this.jobExecution.getStepExecutions()
.stream()
.filter(stepExecution -> stepExecution.getStepName().equals(stepName))
@@ -136,8 +209,28 @@ public long getStepExecutionCount(JobInstance jobInstance, String stepName) {
}
@Override
- public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
- return this.jobExecution;
+ public void add(StepExecution stepExecution) {
+ stepExecution.setId(this.stepExecutionIdIncrementer++);
+ }
+
+ @Override
+ public void addAll(Collection stepExecutions) {
+ for (StepExecution stepExecution : stepExecutions) {
+ this.add(stepExecution);
+ }
+ }
+
+ @Override
+ public void update(StepExecution stepExecution) {
+ stepExecution.setLastUpdated(LocalDateTime.now());
+ if (this.jobExecution.isStopping()) {
+ stepExecution.setTerminateOnly();
+ }
+ }
+
+ @Override
+ public void updateExecutionContext(StepExecution stepExecution) {
+ stepExecution.setLastUpdated(LocalDateTime.now());
}
}
\ No newline at end of file
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java
index ba83296433..bcc16338fd 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/AbstractStep.java
@@ -42,6 +42,7 @@
import org.springframework.batch.core.observability.BatchStepObservation;
import org.springframework.batch.core.observability.BatchStepObservationConvention;
import org.springframework.batch.core.observability.DefaultBatchStepObservationConvention;
+import org.springframework.batch.core.observability.jfr.events.step.StepExecutionEvent;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.scope.context.StepSynchronizationManager;
import org.springframework.batch.item.ExecutionContext;
@@ -63,7 +64,7 @@
* @author Mahmoud Ben Hassine
* @author Jinwoo Bae
*/
-public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
+public abstract class AbstractStep implements StoppableStep, InitializingBean, BeanNameAware {
private static final Log logger = LogFactory.getLog(AbstractStep.class);
@@ -200,11 +201,17 @@ public final void execute(StepExecution stepExecution)
throws JobInterruptedException, UnexpectedJobExecutionException {
Assert.notNull(stepExecution, "stepExecution must not be null");
+ Assert.state(stepExecution.getId() != null,
+ "StepExecution has no id. It must be saved before it can be executed.");
stepExecution.getExecutionContext().put(SpringBatchVersion.BATCH_VERSION_KEY, SpringBatchVersion.getVersion());
if (logger.isDebugEnabled()) {
logger.debug("Executing: id=" + stepExecution.getId());
}
+ StepExecutionEvent stepExecutionEvent = new StepExecutionEvent(stepExecution.getStepName(),
+ stepExecution.getJobExecution().getJobInstance().getJobName(), stepExecution.getId(),
+ stepExecution.getJobExecutionId());
+ stepExecutionEvent.begin();
stepExecution.setStartTime(LocalDateTime.now());
stepExecution.setStatus(BatchStatus.STARTED);
Observation observation = BatchMetrics
@@ -291,6 +298,8 @@ public final void execute(StepExecution stepExecution)
+ "This job is now in an unknown state and should not be restarted.",
name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
}
+ stepExecutionEvent.exitStatus = stepExecution.getExitStatus().getExitCode();
+ stepExecutionEvent.commit();
stopObservation(stepExecution, observation);
stepExecution.setExitStatus(exitStatus);
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/Step.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/Step.java
index 071560b3cb..03447ddb0e 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/Step.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/Step.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2006-2022 the original author or authors.
+ * Copyright 2006-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
* @author Mahmoud Ben Hassine
*
*/
+@FunctionalInterface
public interface Step {
/**
@@ -35,9 +36,14 @@ public interface Step {
String STEP_TYPE_KEY = "batch.stepType";
/**
- * @return the name of this step.
+ * The name of the step. This is used to distinguish between different steps and must
+ * be unique within a job. If not explicitly set, the name will default to the fully
+ * qualified class name.
+ * @return the name of the step (never {@code null})
*/
- String getName();
+ default String getName() {
+ return this.getClass().getName();
+ }
/**
* @return {@code true} if a step that is already marked as complete can be started
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/StepContribution.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/StepContribution.java
index 340505c964..ae703f9277 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/StepContribution.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/StepContribution.java
@@ -73,7 +73,15 @@ public ExitStatus getExitStatus() {
}
/**
- * Increment the counter for the number of items processed.
+ * Increment the counter for the number of filtered items.
+ * @since 6.0.0
+ */
+ public void incrementFilterCount() {
+ this.incrementFilterCount(1);
+ }
+
+ /**
+ * Increment the counter for the number of filtered items.
* @param count The {@code long} amount to increment by.
*/
public void incrementFilterCount(long count) {
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/StoppableStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/StoppableStep.java
new file mode 100644
index 0000000000..e455861f96
--- /dev/null
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/StoppableStep.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2025-present the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.batch.core.step;
+
+/**
+ * Extension of the {@link Step} interface to be implemented by steps that support being
+ * stopped.
+ *
+ * @author Mahmoud Ben Hassine
+ * @since 6.0
+ */
+public interface StoppableStep extends Step {
+
+ /**
+ * Callback to signal the step to stop. The default implementation sets the
+ * {@link StepExecution} to terminate only. Concrete implementations can override this
+ * method to add custom stop logic.
+ * @param stepExecution the current step execution
+ */
+ default void stop(StepExecution stepExecution) {
+ stepExecution.setTerminateOnly();
+ }
+
+}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java
index 21ffbcb799..8b38d9584c 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/ChunkOrientedStepBuilder.java
@@ -16,10 +16,13 @@
package org.springframework.batch.core.step.builder;
import java.lang.reflect.Method;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
+import io.micrometer.core.instrument.MeterRegistry;
+
import org.springframework.batch.core.annotation.AfterChunk;
import org.springframework.batch.core.annotation.AfterProcess;
import org.springframework.batch.core.annotation.AfterRead;
@@ -44,17 +47,21 @@
import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
import org.springframework.batch.core.step.item.ChunkOrientedStep;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
+import org.springframework.batch.core.step.skip.LimitCheckingExceptionHierarchySkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.support.ReflectionUtils;
+import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.retry.RetryListener;
import org.springframework.core.retry.RetryPolicy;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
+import org.springframework.util.Assert;
/**
* A builder for {@link ChunkOrientedStep}. This class extends {@link StepBuilderHelper}
@@ -73,7 +80,7 @@ public class ChunkOrientedStepBuilder extends StepBuilderHelper writer;
- private final PlatformTransactionManager transactionManager;
+ private PlatformTransactionManager transactionManager = new ResourcelessTransactionManager();
private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
@@ -85,25 +92,39 @@ public class ChunkOrientedStepBuilder extends StepBuilderHelper false;
+ private RetryPolicy retryPolicy;
private final Set retryListeners = new LinkedHashSet<>();
- private SkipPolicy skipPolicy = new AlwaysSkipItemSkipPolicy();
+ private final Set> retryableExceptions = new HashSet<>();
+
+ private long retryLimit = -1;
+
+ private SkipPolicy skipPolicy;
private final Set> skipListeners = new LinkedHashSet<>();
+ private final Set> skippableExceptions = new HashSet<>();
+
+ private long skipLimit = -1;
+
+ private AsyncTaskExecutor asyncTaskExecutor;
+
+ private MeterRegistry meterRegistry;
+
+ ChunkOrientedStepBuilder(StepBuilderHelper> parent, int chunkSize) {
+ super(parent);
+ this.chunkSize = chunkSize;
+ }
+
/**
* Create a new {@link ChunkOrientedStepBuilder} with the given job repository and
* transaction manager. The step name will be assigned to the bean name.
* @param jobRepository the job repository
- * @param transactionManager the transaction manager
* @param chunkSize the size of the chunk to be processed
*/
- public ChunkOrientedStepBuilder(JobRepository jobRepository, PlatformTransactionManager transactionManager,
- int chunkSize) {
+ public ChunkOrientedStepBuilder(JobRepository jobRepository, int chunkSize) {
super(jobRepository);
- this.transactionManager = transactionManager;
this.chunkSize = chunkSize;
}
@@ -112,13 +133,10 @@ public ChunkOrientedStepBuilder(JobRepository jobRepository, PlatformTransaction
* repository and transaction manager.
* @param name the step name
* @param jobRepository the job repository
- * @param transactionManager the transaction manager
* @param chunkSize the size of the chunk to be processed
*/
- public ChunkOrientedStepBuilder(String name, JobRepository jobRepository,
- PlatformTransactionManager transactionManager, int chunkSize) {
+ public ChunkOrientedStepBuilder(String name, JobRepository jobRepository, int chunkSize) {
super(name, jobRepository);
- this.transactionManager = transactionManager;
this.chunkSize = chunkSize;
}
@@ -163,6 +181,17 @@ public ChunkOrientedStepBuilder writer(ItemWriter writer) {
return self();
}
+ /**
+ * Sets the transaction manager to use for the chunk-oriented tasklet. Defaults to a
+ * {@link ResourcelessTransactionManager} if none is provided.
+ * @param transactionManager a transaction manager set
+ * @return this for fluent chaining
+ */
+ public ChunkOrientedStepBuilder transactionManager(PlatformTransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ return self();
+ }
+
/**
* Sets the transaction attributes for the tasklet execution. Defaults to the default
* values for the transaction manager, but can be manipulated to provide longer
@@ -256,6 +285,7 @@ public ChunkOrientedStepBuilder faultTolerant() {
* @return this for fluent chaining
*/
public ChunkOrientedStepBuilder retryPolicy(RetryPolicy retryPolicy) {
+ Assert.notNull(retryPolicy, "retryPolicy must not be null");
this.retryPolicy = retryPolicy;
return self();
}
@@ -271,6 +301,18 @@ public ChunkOrientedStepBuilder retryListener(RetryListener retryListener)
return self();
}
+ @SafeVarargs
+ public final ChunkOrientedStepBuilder retry(Class extends Throwable>... retryableExceptions) {
+ this.retryableExceptions.addAll(Arrays.stream(retryableExceptions).toList());
+ return self();
+ }
+
+ public ChunkOrientedStepBuilder retryLimit(long retryLimit) {
+ Assert.isTrue(retryLimit > 0, "retryLimit must be positive");
+ this.retryLimit = retryLimit;
+ return self();
+ }
+
/**
* Set the skip policy for the step. This policy determines how the step handles
* skipping items in case of failures. It can be used to define the conditions under
@@ -280,6 +322,7 @@ public ChunkOrientedStepBuilder retryListener(RetryListener retryListener)
* @return this for fluent chaining
*/
public ChunkOrientedStepBuilder skipPolicy(SkipPolicy skipPolicy) {
+ Assert.notNull(skipPolicy, "skipPolicy must not be null");
this.skipPolicy = skipPolicy;
return self();
}
@@ -296,16 +339,78 @@ public ChunkOrientedStepBuilder skipListener(SkipListener skipListen
return self();
}
+ @SafeVarargs
+ public final ChunkOrientedStepBuilder skip(Class extends Throwable>... skippableExceptions) {
+ this.skippableExceptions.addAll(Arrays.stream(skippableExceptions).toList());
+ return self();
+ }
+
+ public ChunkOrientedStepBuilder skipLimit(long skipLimit) {
+ Assert.isTrue(skipLimit > 0, "skipLimit must be positive");
+ this.skipLimit = skipLimit;
+ return self();
+ }
+
+ /**
+ * Set the asynchronous task executor to be used for processing items concurrently.
+ * This allows for concurrent processing of items, improving performance and
+ * throughput. If not set, the step will process items sequentially.
+ * @param asyncTaskExecutor the asynchronous task executor to use
+ * @return this for fluent chaining
+ */
+ public ChunkOrientedStepBuilder taskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
+ this.asyncTaskExecutor = asyncTaskExecutor;
+ return self();
+ }
+
+ /**
+ * Set the meter registry to be used for collecting metrics during step execution.
+ * This allows for monitoring and analyzing the performance of the step. If not set,
+ * it will default to {@link io.micrometer.core.instrument.Metrics#globalRegistry}.
+ * @param meterRegistry the MeterRegistry to use
+ * @return this for fluent chaining
+ */
+ public ChunkOrientedStepBuilder meterRegistry(MeterRegistry meterRegistry) {
+ this.meterRegistry = meterRegistry;
+ return self();
+ }
+
@SuppressWarnings("unchecked")
public ChunkOrientedStep build() {
ChunkOrientedStep chunkOrientedStep = new ChunkOrientedStep<>(this.getName(), this.chunkSize, this.reader,
- this.writer, this.getJobRepository(), this.transactionManager);
- chunkOrientedStep.setItemProcessor(this.processor);
+ this.writer, this.getJobRepository());
+ if (this.processor != null) {
+ chunkOrientedStep.setItemProcessor(this.processor);
+ }
+ chunkOrientedStep.setTransactionManager(this.transactionManager);
chunkOrientedStep.setTransactionAttribute(this.transactionAttribute);
chunkOrientedStep.setInterruptionPolicy(this.interruptionPolicy);
+ if (this.retryPolicy == null) {
+ if (!this.retryableExceptions.isEmpty() || this.retryLimit > 0) {
+ this.retryPolicy = RetryPolicy.builder()
+ .maxAttempts(this.retryLimit)
+ .includes(this.retryableExceptions)
+ .build();
+ }
+ else {
+ this.retryPolicy = throwable -> false;
+ }
+ }
chunkOrientedStep.setRetryPolicy(this.retryPolicy);
+ if (this.skipPolicy == null) {
+ if (!this.skippableExceptions.isEmpty() || this.skipLimit > 0) {
+ this.skipPolicy = new LimitCheckingExceptionHierarchySkipPolicy(this.skippableExceptions,
+ this.skipLimit);
+ }
+ else {
+ this.skipPolicy = new AlwaysSkipItemSkipPolicy();
+ }
+ }
chunkOrientedStep.setSkipPolicy(this.skipPolicy);
chunkOrientedStep.setFaultTolerant(this.faultTolerant);
+ if (this.asyncTaskExecutor != null) {
+ chunkOrientedStep.setTaskExecutor(this.asyncTaskExecutor);
+ }
streams.forEach(chunkOrientedStep::registerItemStream);
stepListeners.forEach(stepListener -> {
if (stepListener instanceof ItemReadListener) {
@@ -323,6 +428,9 @@ public ChunkOrientedStep build() {
});
retryListeners.forEach(chunkOrientedStep::registerRetryListener);
skipListeners.forEach(chunkOrientedStep::registerSkipListener);
+ if (this.meterRegistry != null) {
+ chunkOrientedStep.setMeterRegistry(this.meterRegistry);
+ }
try {
chunkOrientedStep.afterPropertiesSet();
}
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java
index b40688b58c..f18c567eb3 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/FaultTolerantStepBuilder.java
@@ -92,7 +92,11 @@
* @author Mahmoud Ben Hassine
* @author Ian Choi
* @since 2.2
+ * @deprecated Since 6.0, use
+ * {@link org.springframework.batch.core.step.builder.ChunkOrientedStepBuilder} instead.
+ * Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class FaultTolerantStepBuilder extends SimpleStepBuilder {
private final ChunkMonitor chunkMonitor = new ChunkMonitor();
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java
index 0ce25a8184..bd711e81e1 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/SimpleStepBuilder.java
@@ -66,7 +66,10 @@
* @author Mahmoud Ben Hassine
* @author Parikshit Dutta
* @since 2.2
+ * @deprecated Since 6.0 in favor of {@link ChunkOrientedStepBuilder}. Scheduled for
+ * removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class SimpleStepBuilder extends AbstractTaskletStepBuilder> {
private static final int DEFAULT_COMMIT_INTERVAL = 1;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilder.java
index 90fcca83b0..9442429842 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilder.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/StepBuilder.java
@@ -65,6 +65,16 @@ public TaskletStepBuilder tasklet(Tasklet tasklet, PlatformTransactionManager tr
return new TaskletStepBuilder(this).tasklet(tasklet, transactionManager);
}
+ /**
+ * Build a step with a custom tasklet, not necessarily item processing.
+ * @param tasklet a tasklet
+ * @return a {@link TaskletStepBuilder}
+ * @since 6.0
+ */
+ public TaskletStepBuilder tasklet(Tasklet tasklet) {
+ return new TaskletStepBuilder(this).tasklet(tasklet);
+ }
+
/**
* Build a step that processes items in chunks with the size provided. To extend the
* step to being fault tolerant, call the {@link SimpleStepBuilder#faultTolerant()}
@@ -82,11 +92,28 @@ public TaskletStepBuilder tasklet(Tasklet tasklet, PlatformTransactionManager tr
* @param the type of item to be processed as input
* @param the type of item to be output
* @since 5.0
+ * @deprecated since 6.0, use {@link #chunk(int)} instead. Scheduled for removal in
+ * 7.0.
*/
+ @Deprecated(since = "6.0", forRemoval = true)
public SimpleStepBuilder chunk(int chunkSize, PlatformTransactionManager transactionManager) {
return new SimpleStepBuilder(this).transactionManager(transactionManager).chunk(chunkSize);
}
+ /**
+ * Build a step that processes items in chunks with the size provided. To extend the
+ * step to being fault-tolerant, call the
+ * {@link ChunkOrientedStepBuilder#faultTolerant()} method on the builder.
+ * @param chunkSize the chunk size (commit interval)
+ * @return a {@link ChunkOrientedStepBuilder} for method chaining
+ * @param the type of item to be processed as input
+ * @param the type of item to be output
+ * @since 6.0
+ */
+ public ChunkOrientedStepBuilder chunk(int chunkSize) {
+ return new ChunkOrientedStepBuilder<>(this, chunkSize);
+ }
+
/**
* Build a step that processes items in chunks with the completion policy provided. To
* extend the step to being fault tolerant, call the
@@ -105,8 +132,8 @@ public SimpleStepBuilder chunk(int chunkSize, PlatformTransactionMa
* @param the type of item to be processed as input
* @param the type of item to be output
* @since 5.0
- * @deprecated since 6.0, use {@link #chunk(int, PlatformTransactionManager)} instead.
- * Scheduled for removal in 6.2 or later.
+ * @deprecated since 6.0, use {@link #chunk(int)} instead. Scheduled for removal in
+ * 7.0.
*/
@Deprecated(since = "6.0", forRemoval = true)
public SimpleStepBuilder chunk(CompletionPolicy completionPolicy,
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/TaskletStepBuilder.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/TaskletStepBuilder.java
index 896fce2cea..d21ccc5a31 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/TaskletStepBuilder.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/builder/TaskletStepBuilder.java
@@ -49,6 +49,16 @@ public TaskletStepBuilder tasklet(Tasklet tasklet, PlatformTransactionManager tr
return this;
}
+ /**
+ * @param tasklet the tasklet to use
+ * @return this for fluent chaining
+ * @since 6.0
+ */
+ public TaskletStepBuilder tasklet(Tasklet tasklet) {
+ this.tasklet = tasklet;
+ return this;
+ }
+
@Override
protected TaskletStepBuilder self() {
return this;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/BatchListenerFactoryHelper.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/BatchListenerFactoryHelper.java
index 5d6d22ce0c..0204b7dfea 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/BatchListenerFactoryHelper.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/BatchListenerFactoryHelper.java
@@ -22,10 +22,11 @@
/**
* Package private helper for step factory beans.
- *
+ *
* @author Dave Syer
- *
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
abstract class BatchListenerFactoryHelper {
public static List getListeners(StepListener[] listeners, Class super T> cls) {
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/FaultTolerantStepFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/FaultTolerantStepFactoryBean.java
index 61e867db80..e85c960622 100755
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/FaultTolerantStepFactoryBean.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/FaultTolerantStepFactoryBean.java
@@ -48,8 +48,9 @@
* @author Robert Kasanicky
* @author Morten Andersen-Gott
* @author Ian Choi
- *
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class FaultTolerantStepFactoryBean extends SimpleStepFactoryBean {
private Map, Boolean> skippableExceptionClasses = new HashMap<>();
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java
index f12f4fda1a..646928d85a 100755
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/factory/SimpleStepFactoryBean.java
@@ -59,8 +59,9 @@
* @author Dave Syer
* @author Robert Kasanicky
* @author Mahmoud Ben Hassine
- *
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class SimpleStepFactoryBean implements FactoryBean, BeanNameAware {
private String name;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/BatchRetryTemplate.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/BatchRetryTemplate.java
index 4d16fbf665..8004deb3f1 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/BatchRetryTemplate.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/BatchRetryTemplate.java
@@ -53,8 +53,9 @@
*
* @author Dave Syer
* @author Mahmoud Ben Hassine
- *
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class BatchRetryTemplate implements RetryOperations {
private static class BatchRetryState extends DefaultRetryState {
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkMonitor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkMonitor.java
index edcb5b0a34..a1a4044b05 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkMonitor.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkMonitor.java
@@ -34,7 +34,9 @@
* @author Mahmoud Ben Hassine
* @author Seungrae Kim
* @since 2.0
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class ChunkMonitor extends ItemStreamSupport {
private final Log logger = LogFactory.getLog(getClass());
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java
index a5943c96ee..720aa0556a 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedStep.java
@@ -15,12 +15,18 @@
*/
package org.springframework.batch.core.step.item;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jspecify.annotations.Nullable;
-import org.springframework.batch.core.BatchStatus;
-import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.job.JobInterruptedException;
import org.springframework.batch.core.listener.ChunkListener;
import org.springframework.batch.core.listener.CompositeChunkListener;
@@ -32,6 +38,14 @@
import org.springframework.batch.core.listener.ItemReadListener;
import org.springframework.batch.core.listener.ItemWriteListener;
import org.springframework.batch.core.listener.SkipListener;
+import org.springframework.batch.core.observability.BatchMetrics;
+import org.springframework.batch.core.observability.jfr.events.step.chunk.ChunkScanEvent;
+import org.springframework.batch.core.observability.jfr.events.step.chunk.ChunkTransactionEvent;
+import org.springframework.batch.core.observability.jfr.events.step.chunk.ChunkWriteEvent;
+import org.springframework.batch.core.observability.jfr.events.step.chunk.ItemProcessEvent;
+import org.springframework.batch.core.observability.jfr.events.step.chunk.ItemReadEvent;
+import org.springframework.batch.core.scope.context.StepContext;
+import org.springframework.batch.core.scope.context.StepSynchronizationManager;
import org.springframework.batch.core.step.StepContribution;
import org.springframework.batch.core.step.StepExecution;
import org.springframework.batch.core.repository.JobRepository;
@@ -40,6 +54,7 @@
import org.springframework.batch.core.step.StepInterruptionPolicy;
import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
+import org.springframework.batch.core.step.skip.SkipListenerFailedException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
@@ -48,26 +63,29 @@
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemStream;
+import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryListener;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.core.retry.Retryable;
import org.springframework.core.retry.support.CompositeRetryListener;
+import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
/**
- * Step implementation for the chunk-oriented processing model.
+ * Step implementation for the chunk-oriented processing model. This class also supports
+ * faut-tolerance features (retry and skip) as well as concurrent item processing when a
+ * {@link AsyncTaskExecutor} is provided.
*
- * @author Mahmoud Ben Hassine
* @param type of input items
* @param type of output items
+ * @author Mahmoud Ben Hassine
* @since 6.0
*/
public class ChunkOrientedStep extends AbstractStep {
@@ -100,11 +118,11 @@ public class ChunkOrientedStep extends AbstractStep {
/*
* Transaction related parameters
*/
- private final PlatformTransactionManager transactionManager;
+ private PlatformTransactionManager transactionManager;
private TransactionTemplate transactionTemplate;
- private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
+ private TransactionAttribute transactionAttribute;
/*
* Chunk related parameters
@@ -130,6 +148,16 @@ public class ChunkOrientedStep extends AbstractStep {
private final CompositeSkipListener compositeSkipListener = new CompositeSkipListener<>();
+ /*
+ * Concurrency parameters
+ */
+ private AsyncTaskExecutor taskExecutor;
+
+ /*
+ * Observability parameters
+ */
+ private MeterRegistry meterRegistry;
+
/**
* Create a new {@link ChunkOrientedStep}.
* @param name the name of the step
@@ -137,16 +165,14 @@ public class ChunkOrientedStep extends AbstractStep {
* @param itemReader the item reader to read items
* @param itemWriter the item writer to write items
* @param jobRepository the job repository to use for this step
- * @param transactionManager the transaction manager to use for this step
*/
public ChunkOrientedStep(String name, int chunkSize, ItemReader itemReader, ItemWriter itemWriter,
- JobRepository jobRepository, PlatformTransactionManager transactionManager) {
+ JobRepository jobRepository) {
super(name);
this.chunkSize = chunkSize;
this.itemReader = itemReader;
this.itemWriter = itemWriter;
setJobRepository(jobRepository);
- this.transactionManager = transactionManager;
}
/**
@@ -214,6 +240,16 @@ public void registerChunkListener(ChunkListener chunkListener) {
this.compositeChunkListener.register(chunkListener);
}
+ /**
+ * Set the {@link PlatformTransactionManager} to use for the chunk-oriented tasklet.
+ * Defaults to a {@link ResourcelessTransactionManager}.
+ * @param transactionManager a transaction manager set, must not be null.
+ */
+ public void setTransactionManager(PlatformTransactionManager transactionManager) {
+ Assert.notNull(transactionManager, "Transaction manager must not be null");
+ this.transactionManager = transactionManager;
+ }
+
/**
* Set the transaction attribute for this step.
* @param transactionAttribute the transaction attribute to set
@@ -234,6 +270,15 @@ public void setFaultTolerant(boolean faultTolerant) {
this.faultTolerant = faultTolerant;
}
+ /**
+ * Set the {@link AsyncTaskExecutor} to use for processing items asynchronously.
+ * @param asyncTaskExecutor the asynchronous task executor to set
+ */
+ public void setTaskExecutor(AsyncTaskExecutor asyncTaskExecutor) {
+ Assert.notNull(asyncTaskExecutor, "Task executor must not be null");
+ this.taskExecutor = asyncTaskExecutor;
+ }
+
/**
* Set the {@link RetryPolicy} for this step.
* @param retryPolicy the retry policy to set
@@ -273,10 +318,27 @@ public void registerSkipListener(SkipListener skipListener) {
this.compositeSkipListener.register(skipListener);
}
+ /**
+ * Set the meter registry to use for metrics.
+ * @param meterRegistry the meter registry
+ * @since 6.0
+ */
+ public void setMeterRegistry(MeterRegistry meterRegistry) {
+ Assert.notNull(meterRegistry, "Meter registry must not be null");
+ this.meterRegistry = meterRegistry;
+ }
+
@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
- Assert.notNull(this.transactionManager, "Transaction manager must not be null");
+ if (this.transactionManager == null) {
+ logger.info("No transaction manager has been set. Defaulting to ResourcelessTransactionManager.");
+ this.transactionManager = new ResourcelessTransactionManager();
+ }
+ if (this.transactionAttribute == null) {
+ logger.info("No transaction attribute has been set. Defaulting to DefaultTransactionAttribute.");
+ this.transactionAttribute = new DefaultTransactionAttribute();
+ }
Assert.isTrue(this.chunkSize > 0, "Chunk size must be greater than 0");
Assert.notNull(this.itemReader, "Item reader must not be null");
Assert.notNull(this.itemWriter, "Item writer must not be null");
@@ -295,6 +357,10 @@ public void afterPropertiesSet() throws Exception {
this.retryTemplate.setRetryPolicy(this.retryPolicy);
this.retryTemplate.setRetryListener(this.compositeRetryListener);
}
+ if (this.meterRegistry == null) {
+ logger.info("No meter registry has been set. Defaulting to the global meter registry.");
+ this.meterRegistry = Metrics.globalRegistry;
+ }
}
@Override
@@ -309,79 +375,171 @@ protected void close(ExecutionContext executionContext) throws Exception {
@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
- while (this.chunkTracker.moreItems()) {
- // check interruption policy before processing next chunk
- try {
- this.interruptionPolicy.checkInterrupted(stepExecution);
+ stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
+ while (this.chunkTracker.moreItems() && !interrupted(stepExecution)) {
+ // process next chunk in its own transaction
+ this.transactionTemplate.executeWithoutResult(transactionStatus -> {
+ ChunkTransactionEvent chunkTransactionEvent = new ChunkTransactionEvent(stepExecution.getStepName(),
+ stepExecution.getId());
+ chunkTransactionEvent.begin();
+ StepContribution contribution = stepExecution.createStepContribution();
+ processNextChunk(transactionStatus, contribution, stepExecution);
+ chunkTransactionEvent.transactionStatus = transactionStatus.isRollbackOnly()
+ ? BatchMetrics.STATUS_ROLLED_BACK : BatchMetrics.STATUS_COMMITTED;
+ chunkTransactionEvent.commit();
+ });
+ }
+ }
+
+ private void processNextChunk(TransactionStatus status, StepContribution contribution,
+ StepExecution stepExecution) {
+ if (isConcurrent()) {
+ processChunkConcurrently(status, contribution, stepExecution);
+ }
+ else {
+ processChunkSequentially(status, contribution, stepExecution);
+ }
+ }
+
+ private void processChunkConcurrently(TransactionStatus status, StepContribution contribution,
+ StepExecution stepExecution) {
+ List> itemProcessingTasks = new LinkedList<>();
+ try {
+ // read items and submit concurrent item processing tasks
+ for (int i = 0; i < this.chunkSize; i++) {
+ I item = readItem(contribution);
+ if (item != null) {
+ Future itemProcessingFuture = this.taskExecutor.submit(() -> processItem(item, contribution));
+ itemProcessingTasks.add(itemProcessingFuture);
+ }
}
- catch (JobInterruptedException exception) {
- stepExecution.setTerminateOnly();
- stepExecution.setStatus(BatchStatus.STOPPED);
- stepExecution.setExitStatus(ExitStatus.STOPPED);
+ // exclude empty chunks (when the total items is a multiple of the chunk size)
+ if (itemProcessingTasks.isEmpty()) {
return;
}
- // process next chunk in its own transaction
- this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- @Override
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- StepContribution contribution = stepExecution.createStepContribution();
- Chunk inputChunk = new Chunk<>();
- Chunk processedChunk = new Chunk<>();
- try {
- inputChunk = read(contribution);
- if (inputChunk.isEmpty()) {
- return;
- }
- compositeChunkListener.beforeChunk(inputChunk);
- processedChunk = process(inputChunk, contribution);
- write(processedChunk, contribution);
- compositeChunkListener.afterChunk(processedChunk);
- stepExecution.apply(contribution);
- stepExecution.incrementCommitCount();
- compositeItemStream.update(stepExecution.getExecutionContext());
- getJobRepository().update(stepExecution);
- getJobRepository().updateExecutionContext(stepExecution);
- }
- catch (Exception e) {
- logger.error("Rolling back chunk transaction", e);
- status.setRollbackOnly();
- stepExecution.incrementRollbackCount();
- compositeChunkListener.onChunkError(e, processedChunk);
- throw new FatalStepExecutionException("Unable to process chunk", e);
- }
+
+ // collect processed items
+ Chunk processedChunk = new Chunk<>();
+ for (Future future : itemProcessingTasks) {
+ O processedItem = future.get();
+ if (processedItem != null) {
+ processedChunk.add(processedItem);
}
- });
+ }
+
+ // write processed items
+ writeChunk(processedChunk, contribution);
+ stepExecution.incrementCommitCount();
+ }
+ catch (Exception e) {
+ logger.error("Rolling back chunk transaction", e);
+ status.setRollbackOnly();
+ stepExecution.incrementRollbackCount();
+ throw new FatalStepExecutionException("Unable to process chunk", e);
+ }
+ finally {
+ // apply contribution and update streams
+ stepExecution.apply(contribution);
+ this.compositeItemStream.update(stepExecution.getExecutionContext());
+ }
+
+ }
+
+ private void processChunkSequentially(TransactionStatus status, StepContribution contribution,
+ StepExecution stepExecution) {
+ Chunk inputChunk = new Chunk<>();
+ Chunk processedChunk = new Chunk<>();
+ try {
+ inputChunk = readChunk(contribution);
+ if (inputChunk.isEmpty()) {
+ return;
+ }
+ compositeChunkListener.beforeChunk(inputChunk);
+ processedChunk = processChunk(inputChunk, contribution);
+ writeChunk(processedChunk, contribution);
+ compositeChunkListener.afterChunk(processedChunk);
+ stepExecution.incrementCommitCount();
+ }
+ catch (Exception e) {
+ logger.error("Rolling back chunk transaction", e);
+ status.setRollbackOnly();
+ stepExecution.incrementRollbackCount();
+ compositeChunkListener.onChunkError(e, processedChunk);
+ throw new FatalStepExecutionException("Unable to process chunk", e);
+ }
+ finally {
+ // apply contribution and update streams
+ stepExecution.apply(contribution);
+ compositeItemStream.update(stepExecution.getExecutionContext());
+ }
+ }
+
+ /*
+ * Check if the step has been interrupted either internally via user defined policy or
+ * externally via job operator. This will be checked at chunk boundaries.
+ */
+ private boolean interrupted(StepExecution stepExecution) {
+ // check internal interruption via user defined policy
+ try {
+ this.interruptionPolicy.checkInterrupted(stepExecution);
+ }
+ catch (JobInterruptedException exception) {
+ return true;
+ }
+ // check external interruption via job operator
+ if (stepExecution.isTerminateOnly()) {
+ return true;
}
+ return false;
}
- private Chunk read(StepContribution contribution) throws Exception {
+ private Chunk readChunk(StepContribution contribution) throws Exception {
Chunk chunk = new Chunk<>();
for (int i = 0; i < chunkSize; i++) {
+ I item = readItem(contribution);
+ if (item != null) {
+ chunk.add(item);
+ }
+ }
+ return chunk;
+ }
+
+ @Nullable private I readItem(StepContribution contribution) throws Exception {
+ ItemReadEvent itemReadEvent = new ItemReadEvent(contribution.getStepExecution().getStepName(),
+ contribution.getStepExecution().getId());
+ Timer.Sample sample = startTimerSample();
+ String status = BatchMetrics.STATUS_SUCCESS;
+ I item = null;
+ try {
+ itemReadEvent.begin();
this.compositeItemReadListener.beforeRead();
- try {
- I item = doRead();
- if (item == null) {
- chunkTracker.noMoreItems();
- break;
- }
- else {
- chunk.add(item);
- contribution.incrementReadCount();
- this.compositeItemReadListener.afterRead(item);
- }
+ item = doRead();
+ if (item == null) {
+ this.chunkTracker.noMoreItems();
}
- catch (Exception exception) {
- this.compositeItemReadListener.onReadError(exception);
- if (this.faultTolerant && exception instanceof RetryException retryException) {
- doSkipInRead(retryException, contribution);
- }
- else {
- throw exception;
- }
+ else {
+ contribution.incrementReadCount();
+ this.compositeItemReadListener.afterRead(item);
}
-
+ itemReadEvent.itemReadStatus = BatchMetrics.STATUS_SUCCESS;
}
- return chunk;
+ catch (Exception exception) {
+ this.compositeItemReadListener.onReadError(exception);
+ if (this.faultTolerant && exception instanceof RetryException retryException) {
+ doSkipInRead(retryException, contribution);
+ }
+ else {
+ throw exception;
+ }
+ itemReadEvent.itemReadStatus = BatchMetrics.STATUS_FAILURE;
+ status = BatchMetrics.STATUS_FAILURE;
+ }
+ finally {
+ itemReadEvent.commit();
+ stopTimerSample(sample, contribution.getStepExecution().getJobExecution().getJobInstance().getJobName(),
+ contribution.getStepExecution().getStepName(), "item.read", "Item reading", status);
+ }
+ return item;
}
@Nullable private I doRead() throws Exception {
@@ -407,44 +565,80 @@ public String getName() {
private void doSkipInRead(RetryException retryException, StepContribution contribution) {
Throwable cause = retryException.getCause();
if (this.skipPolicy.shouldSkip(cause, contribution.getStepSkipCount())) {
- this.compositeSkipListener.onSkipInRead(cause);
- contribution.incrementReadSkipCount();
+ try {
+ this.compositeSkipListener.onSkipInRead(cause);
+ contribution.incrementReadSkipCount();
+ }
+ catch (Throwable throwable) {
+ throw new SkipListenerFailedException("Unable to apply onSkipInRead", throwable);
+ }
}
}
- private Chunk process(Chunk chunk, StepContribution contribution) throws Exception {
+ private Chunk processChunk(Chunk chunk, StepContribution contribution) throws Exception {
Chunk processedChunk = new Chunk<>();
for (I item : chunk) {
- try {
- this.compositeItemProcessListener.beforeProcess(item);
- O processedItem = doProcess(item);
- this.compositeItemProcessListener.afterProcess(item, processedItem);
- if (processedItem == null) {
- contribution.incrementFilterCount(1);
- }
- else {
- processedChunk.add(processedItem);
- }
- }
- catch (Exception exception) {
- this.compositeItemProcessListener.onProcessError(item, exception);
- if (this.faultTolerant && exception instanceof RetryException retryException) {
- doSkipInProcess(item, retryException, contribution);
- }
- else {
- throw exception;
- }
+ O processedItem = processItem(item, contribution);
+ if (processedItem != null) {
+ processedChunk.add(processedItem);
}
}
return processedChunk;
}
+ private O processItem(I item, StepContribution contribution) throws Exception {
+ ItemProcessEvent itemProcessEvent = new ItemProcessEvent(contribution.getStepExecution().getStepName(),
+ contribution.getStepExecution().getId());
+ Timer.Sample sample = startTimerSample();
+ String status = BatchMetrics.STATUS_SUCCESS;
+ O processedItem = null;
+ try {
+ itemProcessEvent.begin();
+ this.compositeItemProcessListener.beforeProcess(item);
+ processedItem = doProcess(item);
+ if (processedItem == null) {
+ contribution.incrementFilterCount();
+ }
+ this.compositeItemProcessListener.afterProcess(item, processedItem);
+ itemProcessEvent.itemProcessStatus = BatchMetrics.STATUS_SUCCESS;
+ }
+ catch (Exception exception) {
+ this.compositeItemProcessListener.onProcessError(item, exception);
+ if (this.faultTolerant && exception instanceof RetryException retryException) {
+ doSkipInProcess(item, retryException, contribution);
+ }
+ else {
+ throw exception;
+ }
+ itemProcessEvent.itemProcessStatus = BatchMetrics.STATUS_FAILURE;
+ status = BatchMetrics.STATUS_FAILURE;
+ }
+ finally {
+ itemProcessEvent.commit();
+ stopTimerSample(sample, contribution.getStepExecution().getJobExecution().getJobInstance().getJobName(),
+ contribution.getStepExecution().getStepName(), "item.process", "Item processing", status);
+ }
+ return processedItem;
+ }
+
@Nullable private O doProcess(I item) throws Exception {
if (this.faultTolerant) {
Retryable retryableProcess = new Retryable<>() {
@Override
public @Nullable O execute() throws Throwable {
- return itemProcessor.process(item);
+ StepContext context = StepSynchronizationManager.getContext();
+ final StepExecution stepExecution = context == null ? null : context.getStepExecution();
+ if (isConcurrent() && stepExecution != null) {
+ StepSynchronizationManager.register(stepExecution);
+ }
+ try {
+ return itemProcessor.process(item);
+ }
+ finally {
+ if (isConcurrent() && stepExecution != null) {
+ StepSynchronizationManager.close();
+ }
+ }
}
@Override
@@ -462,29 +656,52 @@ public String getName() {
private void doSkipInProcess(I item, RetryException retryException, StepContribution contribution) {
Throwable cause = retryException.getCause();
if (this.skipPolicy.shouldSkip(cause, contribution.getStepSkipCount())) {
- this.compositeSkipListener.onSkipInProcess(item, retryException.getCause());
- contribution.incrementProcessSkipCount();
+ try {
+ this.compositeSkipListener.onSkipInProcess(item, retryException.getCause());
+ contribution.incrementProcessSkipCount();
+ }
+ catch (Throwable throwable) {
+ throw new SkipListenerFailedException("Unable to apply onSkipInProcess", throwable);
+ }
}
}
- private void write(Chunk chunk, StepContribution contribution) throws Exception {
+ private void writeChunk(Chunk chunk, StepContribution contribution) throws Exception {
+ ChunkWriteEvent chunkWriteEvent = new ChunkWriteEvent(contribution.getStepExecution().getStepName(),
+ contribution.getStepExecution().getId(), chunk.size());
+ Timer.Sample sample = startTimerSample();
+ String status = BatchMetrics.STATUS_SUCCESS;
try {
+ chunkWriteEvent.begin();
this.compositeItemWriteListener.beforeWrite(chunk);
doWrite(chunk);
contribution.incrementWriteCount(chunk.size());
this.compositeItemWriteListener.afterWrite(chunk);
+ chunkWriteEvent.chunkWriteStatus = BatchMetrics.STATUS_SUCCESS;
}
catch (Exception exception) {
this.compositeItemWriteListener.onWriteError(exception, chunk);
+ chunkWriteEvent.chunkWriteStatus = BatchMetrics.STATUS_FAILURE;
+ status = BatchMetrics.STATUS_FAILURE;
if (this.faultTolerant && exception instanceof RetryException retryException) {
logger.info("Retry exhausted while attempting to write items, scanning the chunk", retryException);
+ ChunkScanEvent chunkScanEvent = new ChunkScanEvent(contribution.getStepExecution().getStepName(),
+ contribution.getStepExecution().getId());
+ chunkScanEvent.begin();
scan(chunk, contribution);
+ chunkScanEvent.skipCount = contribution.getSkipCount();
+ chunkScanEvent.commit();
logger.info("Chunk scan completed");
}
else {
throw exception;
}
}
+ finally {
+ chunkWriteEvent.commit();
+ stopTimerSample(sample, contribution.getStepExecution().getJobExecution().getJobInstance().getJobName(),
+ contribution.getStepExecution().getStepName(), "chunk.write", "Chunk writing", status);
+ }
}
private void doWrite(Chunk chunk) throws Exception {
@@ -519,8 +736,13 @@ private void scan(Chunk chunk, StepContribution contribution) {
}
catch (Exception exception) {
if (this.skipPolicy.shouldSkip(exception, contribution.getStepSkipCount())) {
- this.compositeSkipListener.onSkipInWrite(item, exception);
- contribution.incrementWriteSkipCount();
+ try {
+ this.compositeSkipListener.onSkipInWrite(item, exception);
+ contribution.incrementWriteSkipCount();
+ }
+ catch (Throwable throwable) {
+ throw new SkipListenerFailedException("Unable to apply onSkipInWrite", throwable);
+ }
}
else {
logger.error("Failed to write item: " + item, exception);
@@ -530,6 +752,23 @@ private void scan(Chunk chunk, StepContribution contribution) {
}
}
+ private Timer.Sample startTimerSample() {
+ return BatchMetrics.createTimerSample(this.meterRegistry);
+ }
+
+ private void stopTimerSample(Timer.Sample sample, String jobName, String stepName, String operation,
+ String description, String status) {
+ String fullyQualifiedMetricName = BatchMetrics.METRICS_PREFIX + operation;
+ sample.stop(BatchMetrics.createTimer(this.meterRegistry, operation, description + " duration",
+ Tag.of(fullyQualifiedMetricName + ".job.name", jobName),
+ Tag.of(fullyQualifiedMetricName + ".step.name", stepName),
+ Tag.of(fullyQualifiedMetricName + ".status", status)));
+ }
+
+ private boolean isConcurrent() {
+ return this.taskExecutor != null;
+ }
+
private static class ChunkTracker {
private boolean moreItems = true;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java
index cd780c2977..d6a7bb47c8 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkOrientedTasklet.java
@@ -31,7 +31,11 @@
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @param input item type
+ * @deprecated Since 6.0, use
+ * {@link org.springframework.batch.core.step.item.ChunkOrientedStep} instead. Scheduled
+ * for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class ChunkOrientedTasklet implements Tasklet {
private static final String INPUTS_KEY = "INPUTS";
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java
index ed29a3e005..757f8422b5 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProcessor.java
@@ -24,7 +24,9 @@
*
* @author Kyeonghoon Lee (Add FunctionalInterface annotation)
* @since 2.0
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
@FunctionalInterface
public interface ChunkProcessor {
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java
index 2541148a42..ceb2ef825b 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/ChunkProvider.java
@@ -25,7 +25,9 @@
*
* @since 2.0
* @see ChunkOrientedTasklet
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public interface ChunkProvider {
Chunk provide(StepContribution contribution) throws Exception;
diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/DefaultItemFailureHandler.java b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/DefaultItemFailureHandler.java
index e51403aeb6..8d47d4e3f9 100644
--- a/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/DefaultItemFailureHandler.java
+++ b/spring-batch-core/src/main/java/org/springframework/batch/core/step/item/DefaultItemFailureHandler.java
@@ -29,8 +29,9 @@
*
* @author Lucas Ward
* @author Mahmoud Ben Hassine
- *
+ * @deprecated Since 6.0 with no replacement. Scheduled for removal in 7.0.
*/
+@Deprecated(since = "6.0", forRemoval = true)
public class DefaultItemFailureHandler extends ItemListenerSupport