Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ private ExecutableId(String id) {
this.id = id;
}

/** Only used for deserialization, in the InboundInstancesRestController for instance. */
@JsonCreator
private static ExecutableId fromHashedId(String id) {
public static ExecutableId fromHashedId(String id) {
return new ExecutableId(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package io.camunda.connector.runtime.inbound.controller;

import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.runtime.core.inbound.ExecutableId;
import io.camunda.connector.runtime.core.inbound.ProcessElementWithRuntimeData;
import java.util.Collection;
import java.util.List;
import java.util.Map;

Expand All @@ -29,4 +31,5 @@ public record ActiveInboundConnectorResponse(
List<ProcessElementWithRuntimeData> elements,
Map<String, String> data,
Health health,
Long activationTimestamp) {}
Long activationTimestamp,
Collection<Activity> logs) {}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private List<Collection<InstanceAwareModel.InstanceAwareActivity>> getActivityLo
String tenantId, String bpmnProcessId, String elementId, String hostname) {
var result =
executableRegistry.query(
new ActiveExecutableQuery(bpmnProcessId, elementId, null, tenantId));
f -> f.bpmnProcessId(bpmnProcessId).elementId(elementId).tenantId(tenantId));
return result.stream()
.map(ActiveExecutableResponse::logs)
.filter(Predicate.not(Collection::isEmpty))
Expand All @@ -106,12 +106,10 @@ private List<Collection<InstanceAwareModel.InstanceAwareActivity>> getActivityLo
private List<ActiveInboundConnectorResponse> getActiveInboundConnectors(
String bpmnProcessId, String elementId, String type, String tenantId) {
return executableRegistry
.query(new ActiveExecutableQuery(bpmnProcessId, elementId, type, tenantId))
.query(
f -> f.bpmnProcessId(bpmnProcessId).elementId(elementId).type(type).tenantId(tenantId))
.stream()
.map(
response ->
connectorDataMapper.createActiveInboundConnectorResponse(
response, ConnectorDataMapper.WEBHOOK_MAPPER))
.map(connectorDataMapper::createActiveInboundConnectorResponse)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,12 @@ public ConnectorInstances getConnectorInstance(
public ActiveInboundConnectorResponse getConnectorInstanceExecutable(
HttpServletRequest request,
@RequestHeader(name = X_CAMUNDA_FORWARDED_FOR, required = false) String forwardedFor,
@PathVariable(name = "type") String type,
@PathVariable(name = "executableId") String executableId) {
return Optional.ofNullable(
instanceForwardingRouter.forwardToInstancesAndReduceOrLocal(
request,
forwardedFor,
() -> inboundInstancesService.findExecutable(type, executableId),
() -> inboundInstancesService.findExecutable(executableId),
Comment thread
johnBgood marked this conversation as resolved.
new TypeReference<>() {}))
.orElseThrow(
() -> new DataNotFoundException(ActiveInboundConnectorResponse.class, executableId));
Expand All @@ -100,26 +99,39 @@ public ActiveInboundConnectorResponse getConnectorInstanceExecutable(
public List<InstanceAwareModel.InstanceAwareHealth> getConnectorInstanceExecutableHealth(
HttpServletRequest request,
@RequestHeader(name = X_CAMUNDA_FORWARDED_FOR, required = false) String forwardedFor,
@PathVariable(name = "type") String type,
@PathVariable(name = "executableId") String executableId) {

return instanceForwardingRouter.forwardToInstancesAndReduceOrLocal(
request,
forwardedFor,
() -> inboundInstancesService.findInstanceAwareHealth(type, executableId, hostname),
() -> inboundInstancesService.findInstanceAwareHealth(executableId, hostname),
new TypeReference<>() {});
}

@GetMapping("/{type}/executables/{executableId}/logs")
public List<InstanceAwareModel.InstanceAwareActivity> getConnectorInstanceExecutableLogs(
HttpServletRequest request,
@RequestHeader(name = X_CAMUNDA_FORWARDED_FOR, required = false) String forwardedFor,
@PathVariable(name = "type") String type,
@PathVariable(name = "executableId") String executableId) {
return instanceForwardingRouter.forwardToInstancesAndReduceOrLocal(
request,
forwardedFor,
() -> inboundInstancesService.findInstanceAwareActivityLogs(type, executableId, hostname),
() -> inboundInstancesService.findInstanceAwareActivityLogs(executableId, hostname),
new TypeReference<>() {});
}

@PostMapping("/{type}/executables/{executableId}/reset")
public ActiveInboundConnectorResponse resetConnectorInstanceExecutable(
HttpServletRequest request,
@RequestHeader(name = X_CAMUNDA_FORWARDED_FOR, required = false) String forwardedFor,
@PathVariable(name = "executableId") String executableId) {
return Optional.ofNullable(
instanceForwardingRouter.forwardToInstancesAndReduceOrLocal(
request,
forwardedFor,
() -> inboundInstancesService.resetExecutable(executableId),
new TypeReference<>() {}))
.orElseThrow(
() -> new DataNotFoundException(ActiveInboundConnectorResponse.class, executableId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,64 @@
*/
package io.camunda.connector.runtime.inbound.executable;

public record ActiveExecutableQuery(
String bpmnProcessId, String elementId, String type, String tenantId) {}
import io.camunda.connector.runtime.core.inbound.ExecutableId;

/** Mutable filter used with the consumer-based {@code query} API. */
public class ActiveExecutableQuery {

private String bpmnProcessId;
private String elementId;
private String type;
private String tenantId;
private ExecutableId executableId;

public ActiveExecutableQuery bpmnProcessId(String bpmnProcessId) {
this.bpmnProcessId = bpmnProcessId;
return this;
}

public ActiveExecutableQuery elementId(String elementId) {
this.elementId = elementId;
return this;
}

public ActiveExecutableQuery type(String type) {
this.type = type;
return this;
}

public ActiveExecutableQuery tenantId(String tenantId) {
this.tenantId = tenantId;
return this;
}

public ActiveExecutableQuery executableId(ExecutableId executableId) {
this.executableId = executableId;
return this;
}

public ActiveExecutableQuery executableId(String executableId) {
this.executableId = executableId == null ? null : ExecutableId.fromHashedId(executableId);
return this;
}

public String bpmnProcessId() {
return bpmnProcessId;
}

public String elementId() {
return elementId;
}

public String type() {
return type;
}

public String tenantId() {
return tenantId;
}

public ExecutableId executableId() {
return executableId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.camunda.connector.runtime.inbound.executable.RegisteredExecutable.InvalidDefinition;
import io.camunda.connector.runtime.inbound.webhook.WebhookConnectorRegistry;
import io.camunda.connector.runtime.metrics.ConnectorsInboundMetrics;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
Expand Down Expand Up @@ -175,16 +176,9 @@ private RegisteredExecutable activateSingle(
"Webhook connectors are not supported in this environment")));
return new ConnectorNotRegistered(validData, id);
}
final Activated activated;
try {
if (executable instanceof WebhookConnectorExecutable) {
LOG.debug("Registering webhook: {}", data.type());
if (webhookConnectorRegistry.register(
new RegisteredExecutable.Activated(executable, context, id))) {
executable.activate(context);
}
} else {
executable.activate(context);
}
activated = doActivate(executable, context, id);
} catch (Exception e) {
LOG.error("Failed to activate connector", e);
connectorsInboundMetrics.increaseActivationFailure(data.connectorElements().getFirst());
Expand All @@ -200,48 +194,115 @@ private RegisteredExecutable activateSingle(
"Activated inbound connector %s with deduplication ID '%s'",
data.type(), data.deduplicationId())));
connectorsInboundMetrics.increaseActivation(data.connectorElements().getFirst());
return new Activated(executable, context, id);
return activated;
}

/** Deactivates a batch of inbound connectors. */
public void deactivateBatch(List<RegisteredExecutable> executables) {
for (var activeExecutable : executables) {
if (activeExecutable instanceof Activated activated) {
try {
if (activated.executable() instanceof WebhookConnectorExecutable) {
LOG.debug("Unregistering webhook: {}", activated.context().getDefinition().type());
webhookConnectorRegistry.deregister(activated);
}
activated.executable().deactivate();
log(
activeExecutable.id(),
Activity.newBuilder()
.withSeverity(Severity.INFO)
.withTag(ActivityLogTag.LIFECYCLE)
.withMessage(
"Deactivated executable: "
+ activated.context().getDefinition().type()
+ " with executable ID "
+ activated.id()));
} catch (Exception e) {
LOG.error("Failed to deactivate executable", e);
/**
* Handles the webhook registration (if applicable) and activates the executable. Throws on
* failure so callers can apply their own error-handling strategy.
*/
private Activated doActivate(
InboundConnectorExecutable<InboundConnectorContext> executable,
InboundConnectorManagementContext context,
ExecutableId id)
throws Exception {
var activated = new Activated(executable, context, id);
if (executable instanceof WebhookConnectorExecutable) {
LOG.debug("Registering webhook: {}", context.getDefinition().type());
if (webhookConnectorRegistry.register(activated)) {
executable.activate(context);
}
} else {
executable.activate(context);
}
return activated;
}

/** Deactivates a single inbound connector. */
public void deactivateSingle(RegisteredExecutable executable) {
if (executable instanceof Activated activated) {
try {
if (activated.executable() instanceof WebhookConnectorExecutable) {
LOG.debug("Unregistering webhook: {}", activated.context().getDefinition().type());
webhookConnectorRegistry.deregister(activated);
}
connectorsInboundMetrics.increaseDeactivation(
activated.context().connectorElements().getFirst());
activated.executable().deactivate();
log(
executable.id(),
Activity.newBuilder()
.withSeverity(Severity.INFO)
.withTag(ActivityLogTag.LIFECYCLE)
.withMessage(
"Deactivated executable: "
+ activated.context().getDefinition().type()
+ " with executable ID "
+ activated.id()));
} catch (Exception e) {
LOG.error("Failed to deactivate executable", e);
}
connectorsInboundMetrics.increaseDeactivation(
activated.context().connectorElements().getFirst());
}
}

/** Deactivates a batch of inbound connectors. */
public void deactivateBatch(List<RegisteredExecutable> executables) {
for (var executable : executables) {
deactivateSingle(executable);
}
}

/**
* Resets an executable by deactivating it (if currently active) and re-activating it with a fresh
* instance, reusing the same context. Only {@link Activated} and {@link
* RegisteredExecutable.Cancelled} states are supported.
*
* @param executable the executable to reset
* @return a {@link CompletableFuture} that resolves to the new {@link Activated} executable
* @throws IllegalStateException if the executable is not in a resettable state
*/
public CompletableFuture<Activated> restartFromContext(RegisteredExecutable executable) {
RegisteredExecutable.Cancelled cancelled =
switch (executable) {
case Activated activated -> {
LOG.info(
"Resetting activated inbound connector of type '{}'",
activated.context().getDefinition().type());
deactivateSingle(activated);
yield new RegisteredExecutable.Cancelled(
activated.executable(), activated.context(), null, activated.id());
}
Comment thread
johnBgood marked this conversation as resolved.
case RegisteredExecutable.Cancelled c -> {
LOG.info(
"Resetting cancelled inbound connector of type '{}'",
c.context().getDefinition().type());
yield c;
}
default ->
throw new IllegalStateException(
"Cannot reset connector in state: "
+ executable.getClass().getSimpleName()
+ ". Only Activated or Cancelled executables can be reset.");
};

return doRestartFromContext(cancelled, 0, Duration.ZERO);
}

public CompletableFuture<Activated> restartFromContext(
RegisteredExecutable.Cancelled cancelled, ConnectorRetryException retryException) {
return doRestartFromContext(
cancelled, retryException.getRetries(), retryException.getBackoffDuration());
}

private CompletableFuture<Activated> doRestartFromContext(
RegisteredExecutable.Cancelled cancelled, int maxRetries, Duration backoff) {
InboundConnectorExecutable<InboundConnectorContext> newExecutable =
connectorFactory.getInstance(cancelled.context().getDefinition().type());
LOG.warn("Inbound connector executable has requested its reactivation");
try {
RetryPolicy<Object> retryPolicy =
RetryPolicy.builder()
.withDelay(retryException.getBackoffDuration())
.withDelay(backoff)
.onFailedAttempt(
event ->
LOG.error(
Expand All @@ -254,7 +315,7 @@ public CompletableFuture<Activated> restartFromContext(
"Failure #{} to reactivate connector: {}. Retrying.",
event.getAttemptCount(),
cancelled.context().getDefinition().type()))
.withMaxRetries(retryException.getRetries())
.withMaxRetries(maxRetries)
.build();
return Failsafe.with(retryPolicy)
.getAsync(() -> tryRestart(newExecutable, cancelled.context()));
Expand All @@ -269,9 +330,9 @@ private Activated tryRestart(
try {
var executableId =
ExecutableId.fromDeduplicationId(context.getDefinition().deduplicationId());
executable.activate(context);
var activated = doActivate(executable, context, executableId);
LOG.info("Activation successful for {}", context.getDefinition().type());
return new Activated(executable, context, executableId);
return activated;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading
Loading