Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/en/concept/event-listener.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,33 @@ seatunnel:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
report-non-terminal-job-state: false
```

#### report-non-terminal-job-state

- Type: boolean

- Default: false

Whether to report non-terminal job state events to the configured HTTP endpoint.

When set to `true`, the engine will report job state change events for non-terminal states, including:

- `PENDING`

- `SCHEDULED`

- `RUNNING`

- `FAILING`

- `CANCELING`

- `DOING_SAVEPOINT`

When set to `false`, only terminal job states (such as `FINISHED`, `FAILED`, `CANCELED`, `SAVEPOINT_DONE`) will be reported, and transitions into non-terminal states will be ignored.

### Flink Engine

You can define the implementation class of `org.apache.seatunnel.api.event.EventHandler` interface and add to the classpath to automatically load it through SPI.
Expand Down
27 changes: 27 additions & 0 deletions docs/zh/concept/event-listener.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,35 @@ seatunnel:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
report-non-terminal-job-state: false
```

#### report-non-terminal-job-state

- 类型:boolean

- 默认值:false

是否将非终态(Non-terminal)作业状态的状态变更事件上报到配置的 HTTP 接口。

当该配置为 `true` 时,Engine 会上报以下非终态作业状态的变更事件:

- `PENDING`

- `SCHEDULED`

- `RUNNING`

- `FAILING`

- `CANCELING`

- `DOING_SAVEPOINT`

当该配置为 `false` 时,仅会上报终态(Terminal)作业状态(例如 `FINISHED`,`FAILED`,`CANCELED`,`SAVEPOINT_DONE`),非终态状态的变更将被忽略。

该配置项适用于仅关注作业最终结果、希望减少事件上报数量的场景。

### Flink 引擎

您可以定义 `org.apache.seatunnel.api.event.EventHandler` 接口并添加到类路径,SPI会自动加载。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class EngineConfig {

private String eventReportHttpApi;
private Map<String, String> eventReportHttpHeaders = Collections.emptyMap();
private boolean reportNonTerminalJobState = false;

private ExecutionMode mode = ExecutionMode.CLUSTER;

Expand Down Expand Up @@ -140,6 +141,10 @@ public void setJobMetricsPartitionCount(int jobMetricsPartitionCount) {
this.jobMetricsPartitionCount = jobMetricsPartitionCount;
}

public void setReportNonTerminalJobState(boolean reportNonTerminalJobState) {
this.reportNonTerminalJobState = reportNonTerminalJobState;
}

public void setTaskExecutionThreadShareMode(ThreadShareMode taskExecutionThreadShareMode) {
checkNotNull(queueType);
this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
}
engineConfig.setEventReportHttpHeaders(headers);
}

Node reportNonTerminalJobStateNode =
attributes.getNamedItem(
ServerConfigOptions.MasterServerConfigOptions
.REPORT_NON_TERMINAL_JOB_STATE);
if (reportNonTerminalJobStateNode != null) {
engineConfig.setReportNonTerminalJobState(
getBooleanValue(getTextContent(reportNonTerminalJobStateNode)));
}
}
} else if (ServerConfigOptions.TELEMETRY.key().equals(name)) {
engineConfig.setTelemetryConfig(parseTelemetryConfig(node));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public static class MasterServerConfigOptions {
public static final String EVENT_REPORT_HTTP = "event-report-http";
public static final String EVENT_REPORT_HTTP_URL = "url";
public static final String EVENT_REPORT_HTTP_HEADERS = "headers";
public static final String REPORT_NON_TERMINAL_JOB_STATE = "report-non-terminal-job-state";

// The options for http server end
/////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,10 @@ public int getPendingJobCount() {
return pendingJobQueue.getJobIdMap().size();
}

public EngineConfig getEngineConfig() {
return engineConfig;
}

@VisibleForTesting
protected IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> getMetricsImap() {
return metricsImap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.job.JobResult;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class PhysicalPlan {

private JobMaster jobMaster;

private EngineConfig engineConfig;

private Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures =
new HashMap<>();

Expand Down Expand Up @@ -133,6 +136,7 @@ public PhysicalPlan(
public void setJobMaster(JobMaster jobMaster) {
this.jobMaster = jobMaster;
pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
this.engineConfig = jobMaster.getEngineConfig();
}

public PassiveCompletableFuture<JobResult> initStateFuture() {
Expand Down Expand Up @@ -274,6 +278,18 @@ public synchronized void updateJobState(@NonNull JobStatus targetState) {
log.info(
String.format(
"%s turned from state %s to %s.", jobFullName, current, targetState));
if (!targetState.isEndState()
&& this.engineConfig != null
&& this.engineConfig.isReportNonTerminalJobState()) {
jobMaster
.getCoordinatorService()
.getEventProcessor()
.process(
new JobStateEvent(
jobImmutableInformation.getJobId(),
jobImmutableInformation.getJobConfig().getName(),
targetState));
}
stateProcess();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import static org.apache.seatunnel.engine.server.checkpoint.CheckpointErrorRestoreEndTest.STREAM_CONF_WITH_ERROR_PATH;
import static org.awaitility.Awaitility.await;

public class JobStateEventTest extends AbstractSeaTunnelServerTest {
class JobStateEventTest extends AbstractSeaTunnelServerTest {

@Test
public void testJobStateEvent() throws InterruptedException {
void testJobStateEvent() {

JobEventProcessor eventProcessor =
(JobEventProcessor) server.getCoordinatorService().getEventProcessor();
Expand Down Expand Up @@ -68,40 +68,111 @@ public void testJobStateEvent() throws InterruptedException {
List<EventHandler> handlers =
(List<EventHandler>) ReflectionUtils.getField(eventProcessor, "handlers").get();
handlers.add(eventHandler);
long jobId_finished = System.currentTimeMillis();
long jobIdFinished = System.currentTimeMillis();
long currentTimeMillis = System.currentTimeMillis();
startJob(jobId_finished, "fake_to_console.conf", false);
startJob(jobIdFinished, "fake_to_console.conf", false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
server.getCoordinatorService()
.getJobStatus(jobId_finished)));
.getJobStatus(jobIdFinished)));
// check whether the event handler is executed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(1, accessCounter.get()));
JobStateEvent jobStateEventFinished = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobId_finished), jobStateEventFinished.getJobId());
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobId());
Assertions.assertEquals(JobStatus.FINISHED, jobStateEventFinished.getJobStatus());
Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobId_finished), jobStateEventFinished.getJobName());
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobName());

long jobId_failed = System.currentTimeMillis();
startJob(jobId_failed, STREAM_CONF_WITH_ERROR_PATH, false);
long jobIdFailed = System.currentTimeMillis();
startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FAILED,
server.getCoordinatorService().getJobStatus(jobId_failed)));
server.getCoordinatorService().getJobStatus(jobIdFailed)));

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(2, accessCounter.get()));
JobStateEvent jobStateEventFailed = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobId_failed), jobStateEventFailed.getJobId());
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobId());
Assertions.assertEquals(JobStatus.FAILED, jobStateEventFailed.getJobStatus());
Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobId_failed), jobStateEventFailed.getJobName());
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobName());
}

@Test
void testNotEndJobStateEvent() {
server.getCoordinatorService().getEngineConfig().setReportNonTerminalJobState(true);

JobEventProcessor eventProcessor =
(JobEventProcessor) server.getCoordinatorService().getEventProcessor();

AtomicInteger accessCounter = new AtomicInteger(0);
AtomicReference<JobStateEvent> jobStateEventReference = new AtomicReference<>();
EventHandler eventHandler =
event -> {
if (event.getEventType() != EventType.JOB_STATUS) {
return;
}
JobStateEvent jobStateEvent = (JobStateEvent) event;
JobStatus status = jobStateEvent.getJobStatus();
switch (status) {
case PENDING:
case SCHEDULED:
case RUNNING:
case DOING_SAVEPOINT:
case FAILING:
case CANCELING:
accessCounter.incrementAndGet();
jobStateEventReference.lazySet(jobStateEvent);
break;
default:
break;
}
};
// register the event handler
List<EventHandler> handlers =
(List<EventHandler>) ReflectionUtils.getField(eventProcessor, "handlers").get();
handlers.add(eventHandler);
long jobIdFinished = System.currentTimeMillis();
long currentTimeMillis = System.currentTimeMillis();
startJob(jobIdFinished, "fake_to_console.conf", false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
server.getCoordinatorService()
.getJobStatus(jobIdFinished)));
// check whether the event handler is executed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(3, accessCounter.get()));
JobStateEvent jobStateEventFinished = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobId());
Assertions.assertEquals(JobStatus.RUNNING, jobStateEventFinished.getJobStatus());
Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobName());

long jobIdFailed = System.currentTimeMillis();
startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FAILED,
server.getCoordinatorService().getJobStatus(jobIdFailed)));

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(7, accessCounter.get()));
JobStateEvent jobStateEventFailed = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobId());
Assertions.assertEquals(JobStatus.FAILING, jobStateEventFailed.getJobStatus());
Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobName());
}
}