Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/concept/event-listener.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ seatunnel:
url: "http://example.com:1024/event/report"
headers:
Content-Type: application/json
report-non-terminal-job-state: false
```

### Flink Engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class EngineConfig {

private String eventReportHttpApi;
private Map<String, String> eventReportHttpHeaders = Collections.emptyMap();
private boolean reportNonTerminalJobState = false;

private ExecutionMode mode = ExecutionMode.CLUSTER;

Expand Down Expand Up @@ -140,6 +141,10 @@ public void setJobMetricsPartitionCount(int jobMetricsPartitionCount) {
this.jobMetricsPartitionCount = jobMetricsPartitionCount;
}

public void setReportNonTerminalJobState(boolean reportNonTerminalJobState) {
this.reportNonTerminalJobState = reportNonTerminalJobState;
}

public void setTaskExecutionThreadShareMode(ThreadShareMode taskExecutionThreadShareMode) {
checkNotNull(queueType);
this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
}
engineConfig.setEventReportHttpHeaders(headers);
}

Node reportNonTerminalJobStateNode =
attributes.getNamedItem(
ServerConfigOptions.MasterServerConfigOptions
.REPORT_NON_TERMINAL_JOB_STATE);
if (reportNonTerminalJobStateNode != null) {
engineConfig.setReportNonTerminalJobState(
getBooleanValue(getTextContent(reportNonTerminalJobStateNode)));
}
}
} else if (ServerConfigOptions.TELEMETRY.key().equals(name)) {
engineConfig.setTelemetryConfig(parseTelemetryConfig(node));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ public static class MasterServerConfigOptions {
public static final String EVENT_REPORT_HTTP = "event-report-http";
public static final String EVENT_REPORT_HTTP_URL = "url";
public static final String EVENT_REPORT_HTTP_HEADERS = "headers";
public static final String REPORT_NON_TERMINAL_JOB_STATE = "report-non-terminal-job-state";

// The options for http server end
/////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,10 @@ public int getPendingJobCount() {
return pendingJobQueue.getJobIdMap().size();
}

public EngineConfig getEngineConfig() {
return engineConfig;
}

@VisibleForTesting
protected IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> getMetricsImap() {
return metricsImap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.job.JobResult;
import org.apache.seatunnel.engine.common.job.JobStateEvent;
Expand Down Expand Up @@ -80,6 +81,8 @@ public class PhysicalPlan {

private JobMaster jobMaster;

private EngineConfig engineConfig;

private Map<TaskGroupLocation, CompletableFuture<SlotProfile>> preApplyResourceFutures =
new HashMap<>();

Expand Down Expand Up @@ -133,6 +136,7 @@ public PhysicalPlan(
public void setJobMaster(JobMaster jobMaster) {
this.jobMaster = jobMaster;
pipelineList.forEach(pipeline -> pipeline.setJobMaster(jobMaster));
this.engineConfig = jobMaster.getEngineConfig();
}

public PassiveCompletableFuture<JobResult> initStateFuture() {
Expand Down Expand Up @@ -274,6 +278,18 @@ public synchronized void updateJobState(@NonNull JobStatus targetState) {
log.info(
String.format(
"%s turned from state %s to %s.", jobFullName, current, targetState));
if (!targetState.isEndState()
&& this.engineConfig != null
&& this.engineConfig.isReportNonTerminalJobState()) {
jobMaster
.getCoordinatorService()
.getEventProcessor()
.process(
new JobStateEvent(
jobImmutableInformation.getJobId(),
jobImmutableInformation.getJobConfig().getName(),
targetState));
}
stateProcess();
} catch (Exception e) {
log.error(ExceptionUtils.getMessage(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import static org.apache.seatunnel.engine.server.checkpoint.CheckpointErrorRestoreEndTest.STREAM_CONF_WITH_ERROR_PATH;
import static org.awaitility.Awaitility.await;

public class JobStateEventTest extends AbstractSeaTunnelServerTest {
class JobStateEventTest extends AbstractSeaTunnelServerTest {

@Test
public void testJobStateEvent() throws InterruptedException {
void testJobStateEvent() {

JobEventProcessor eventProcessor =
(JobEventProcessor) server.getCoordinatorService().getEventProcessor();
Expand Down Expand Up @@ -68,40 +68,111 @@ public void testJobStateEvent() throws InterruptedException {
List<EventHandler> handlers =
(List<EventHandler>) ReflectionUtils.getField(eventProcessor, "handlers").get();
handlers.add(eventHandler);
long jobId_finished = System.currentTimeMillis();
long jobIdFinished = System.currentTimeMillis();
long currentTimeMillis = System.currentTimeMillis();
startJob(jobId_finished, "fake_to_console.conf", false);
startJob(jobIdFinished, "fake_to_console.conf", false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
server.getCoordinatorService()
.getJobStatus(jobId_finished)));
.getJobStatus(jobIdFinished)));
// check whether the event handler is executed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(1, accessCounter.get()));
JobStateEvent jobStateEventFinished = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobId_finished), jobStateEventFinished.getJobId());
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobId());
Assertions.assertEquals(JobStatus.FINISHED, jobStateEventFinished.getJobStatus());
Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobId_finished), jobStateEventFinished.getJobName());
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobName());

long jobId_failed = System.currentTimeMillis();
startJob(jobId_failed, STREAM_CONF_WITH_ERROR_PATH, false);
long jobIdFailed = System.currentTimeMillis();
startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FAILED,
server.getCoordinatorService().getJobStatus(jobId_failed)));
server.getCoordinatorService().getJobStatus(jobIdFailed)));

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(2, accessCounter.get()));
JobStateEvent jobStateEventFailed = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobId_failed), jobStateEventFailed.getJobId());
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobId());
Assertions.assertEquals(JobStatus.FAILED, jobStateEventFailed.getJobStatus());
Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobId_failed), jobStateEventFailed.getJobName());
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobName());
}

@Test
void testNotEndJobStateEvent() {
server.getCoordinatorService().getEngineConfig().setReportNonTerminalJobState(true);

JobEventProcessor eventProcessor =
(JobEventProcessor) server.getCoordinatorService().getEventProcessor();

AtomicInteger accessCounter = new AtomicInteger(0);
AtomicReference<JobStateEvent> jobStateEventReference = new AtomicReference<>();
EventHandler eventHandler =
event -> {
if (event.getEventType() != EventType.JOB_STATUS) {
return;
}
JobStateEvent jobStateEvent = (JobStateEvent) event;
JobStatus status = jobStateEvent.getJobStatus();
switch (status) {
case PENDING:
case SCHEDULED:
case RUNNING:
case DOING_SAVEPOINT:
case FAILING:
case CANCELING:
accessCounter.incrementAndGet();
jobStateEventReference.lazySet(jobStateEvent);
break;
default:
break;
}
};
// register the event handler
List<EventHandler> handlers =
(List<EventHandler>) ReflectionUtils.getField(eventProcessor, "handlers").get();
handlers.add(eventHandler);
long jobIdFinished = System.currentTimeMillis();
long currentTimeMillis = System.currentTimeMillis();
startJob(jobIdFinished, "fake_to_console.conf", false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
server.getCoordinatorService()
.getJobStatus(jobIdFinished)));
// check whether the event handler is executed
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(3, accessCounter.get()));
JobStateEvent jobStateEventFinished = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobId());
Assertions.assertEquals(JobStatus.RUNNING, jobStateEventFinished.getJobStatus());
Assertions.assertTrue(jobStateEventFinished.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobIdFinished), jobStateEventFinished.getJobName());

long jobIdFailed = System.currentTimeMillis();
startJob(jobIdFailed, STREAM_CONF_WITH_ERROR_PATH, false);
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FAILED,
server.getCoordinatorService().getJobStatus(jobIdFailed)));

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertEquals(7, accessCounter.get()));
JobStateEvent jobStateEventFailed = jobStateEventReference.get();
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobId());
Assertions.assertEquals(JobStatus.FAILING, jobStateEventFailed.getJobStatus());
Assertions.assertTrue(jobStateEventFailed.getCreatedTime() > currentTimeMillis);
Assertions.assertEquals(String.valueOf(jobIdFailed), jobStateEventFailed.getJobName());
}
}