Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.azure.core.util.polling;

import java.util.Objects;

public class OperationStatus {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving it into separate class since now, it will have its own POJO like structure as we are supporting OTHER state and must have String value for custom other state.

/**
* An enum to represent all possible states that a long-running operation may find itself in.
* The poll operation is considered complete when the status is one of {@code SUCCESSFULLY_COMPLETED}, {@code USER_CANCELLED} or {@code FAILED}.
* The state {@code OTHER} does not represent long-running operation is complete. This is one of the custom intermediate state.
*/
public enum State {
/**
* Represents that polling has not yet started for this long-running operation.
*/
NOT_STARTED,

/**
* Represents that this long-running operation is in progress and not yet complete.
*/
IN_PROGRESS,

/**
* Represent that this long-running operation is completed successfully.
*/
SUCCESSFULLY_COMPLETED,

/**
* Represents that this long-running operation has failed to successfully complete, however this is still
* considered as complete long-running operation, meaning that the {@link Poller} instance will report that it is complete.
*/
FAILED,

/**
* Represents that this long-running operation is cancelled by user, however this is still
* considered as complete long-running operation.
*/
USER_CANCELLED,

/**
* When long-running operation state could not be represented by any state in {@link State}, this state represents
* a custom state Azure service could be in. This custom state is not considered as complete long-running operation.
* It must have valid value for {@code otherStatus} as {@link String}.
*/
OTHER
}
private State state;
private String otherStatus;

/**
*
* @param state in which the long-running operation find itself in.
*/
public OperationStatus(State state) {
this.state = state;
}

/**
* This is normally used for {@link State#OTHER}.
* @param state in which long-running operation find itself in.
* @param otherStatus The string representation of custom state the long-running operation find itself in.
*/
public OperationStatus(State state, String otherStatus) {
this(state);
if (state == State.OTHER ) {
if ( Objects.isNull(otherStatus) || otherStatus.trim().length() == 0 ) {
throw new IllegalArgumentException("otherStatus can not be empty or null for State.OTHER");
}else {
this.otherStatus = otherStatus;
}
}
}

public State getState(){
return this.state;
}

public String getOtherStatus(){
return this.otherStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,6 @@ public final class PollResponse<T> {
private final Duration retryAfter;
private final Map<Object, Object> properties;

/**
* An enum to represent all possible states that a long-running operation may find itself in.
* The poll operation is considered complete when the status is one of {@code SUCCESSFULLY_COMPLETED}, {@code USER_CANCELLED} or {@code FAILED}.
*/
public enum OperationStatus {
/** Represents that polling has not yet started for this long-running operation.*/
NOT_STARTED,

/** Represents that this long-running operation is in progress and not yet complete.*/
IN_PROGRESS,

/** Represent that this long-running operation is completed successfully.*/
SUCCESSFULLY_COMPLETED,

/**
* Represents that this long-running operation has failed to successfully complete, however this is still
* considered as complete long-running operation, meaning that the {@link Poller} instance will report that it is complete.
*/
FAILED,

/** Represents that this long-running operation is cancelled by user, however this is still
* considered as complete long-running operation.*/
USER_CANCELLED
}

/**
* Creates a new {@link PollResponse} with status, value, retryAfter and properties.
*
Expand Down Expand Up @@ -103,6 +78,19 @@ public PollResponse(OperationStatus status, T value) {
this(status, value, null);
}

/**
* Creates a new {@link PollResponse} with status and value.
*
*<p><strong>Code Sample Creating PollResponse Object</strong></p>
* {@codesnippet com.azure.core.util.polling.pollresponse.status.value}
*
* @param status Mandatory operation status as defined in {@link OperationStatus}.
* @throws NullPointerException If {@code status} is {@code null}.
*/
public PollResponse(OperationStatus status) {
this(status, null);
}

/**
* Represents the status of the long-running operation at the time the last polling operation finished successfully.
* @return A {@link OperationStatus} representing the result of the poll operation.
Expand Down
103 changes: 76 additions & 27 deletions core/azure-core/src/main/java/com/azure/core/util/polling/Poller.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
package com.azure.core.util.polling;

import reactor.core.Disposable;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -17,28 +18,28 @@
* This class offers API that simplifies the task of executing long-running operations against Azure service.
* The {@link Poller} consist of poll operation, cancel operation if supported by Azure service and polling interval.
* <p>
* It provides the following functionality:
* It provides the following functionality:
*
* <ul>
* <li>Querying the current state of long-running operations.</li>
* <li>Requesting an asynchronous notification for long-running operation's state.</li>
* <li>Cancelling the long-running operation if cancellation is supported by the service.</li>
* <li>Triggering a poll operation manually.</li>
* <li>Enable/Disable auto-polling.</li>
* <li>Querying the current state of long-running operations.</li>
* <li>Requesting an asynchronous notification for long-running operation's state.</li>
* <li>Cancelling the long-running operation if cancellation is supported by the service.</li>
* <li>Triggering a poll operation manually.</li>
* <li>Enable/Disable auto-polling.</li>
* </ul>
*
* <p><strong>Auto Polling</strong></p>
* Auto-polling is enabled by-default. It means that the {@link Poller} starts polling as soon as its instance is created. The {@link Poller} will transparently call the poll operation every polling cycle
* and track the state of the long-running operation. Azure services can return {@link PollResponse#getRetryAfter()} to override the {@code Poller.pollInterval} defined in the {@link Poller}.
* The {@link Poller#getStatus()} represents the status returned by the successful long-running operation at the time the last auto-polling or last manual polling, whichever happened most recently.
*
*<p><strong>Disable Auto Polling</strong></p>
* <p><strong>Disable Auto Polling</strong></p>
* For those scenarios which require manual control of the polling cycle, disable auto-poling by calling {@code setAutoPollingEnabled#false} and perform manual poll
* by invoking {@link Poller#poll()} function. It will call poll operation once and update the {@link Poller} with the latest status.
* <p>When auto-polling is disabled, the {@link Poller} will not update its status or other information, unless manual polling is triggered by calling {@link Poller#poll()} function.
*
* <p>The {@link Poller} will stop polling when the long-running operation is complete or it is disabled. The polling is considered complete
* based on status defined in {@link com.azure.core.util.polling.PollResponse.OperationStatus}.
* based on status defined in {@link com.azure.core.util.polling.OperationStatus}.
*
* <p><strong>Code Samples</strong></p>
*
Expand All @@ -53,7 +54,7 @@
*
* @param <T> Type of poll response value
* @see PollResponse
* @see com.azure.core.util.polling.PollResponse.OperationStatus
* @see com.azure.core.util.polling.OperationStatus
*/
public class Poller<T> {

Expand Down Expand Up @@ -106,10 +107,10 @@ public class Poller<T> {
* <p><strong>Code Sample - Create poller object</strong></p>
* {@codesnippet com.azure.core.util.polling.poller.initialize.interval.polloperation}
*
* @param pollInterval Not-null and greater than zero poll interval.
* @param pollInterval Not-null and greater than zero poll interval.
* @param pollOperation The polling operation to be called by the {@link Poller} instance. This is a callback into the client library,
* which must never return {@code null}, and which must always have a non-null {@link com.azure.core.util.polling.PollResponse.OperationStatus}.
*{@link Mono} returned from poll operation should never return {@link Mono#error(Throwable)}.If any unexpected scenario happens in poll operation,
* which must never return {@code null}, and which must always have a non-null {@link com.azure.core.util.polling.OperationStatus}.
* {@link Mono} returned from poll operation should never return {@link Mono#error(Throwable)}.If any unexpected scenario happens in poll operation,
* it should be handled by client library and return a valid {@link PollResponse}. However if poll operation returns {@link Mono#error(Throwable)},
* the {@link Poller} will disregard that and continue to poll.
* @throws NullPointerException If {@code pollInterval} or {@code pollOperation} are {@code null}.
Expand All @@ -124,7 +125,7 @@ public Poller(Duration pollInterval, Function<PollResponse<T>, Mono<PollResponse

this.pollInterval = pollInterval;
this.pollOperation = pollOperation;
this.pollResponse = new PollResponse<>(PollResponse.OperationStatus.NOT_STARTED, null);
this.pollResponse = new PollResponse<>(new OperationStatus(OperationStatus.State.NOT_STARTED));

this.fluxHandle = asyncPollRequestWithDelay()
.flux()
Expand All @@ -142,15 +143,15 @@ public Poller(Duration pollInterval, Function<PollResponse<T>, Mono<PollResponse
* The next poll cycle will be defined by retryAfter value in {@link PollResponse}.
* In absence of {@link PollResponse#getRetryAfter()}, the {@link Poller} will use {@code pollInterval}.
*
* @param pollInterval Not-null and greater than zero poll interval.
* @param pollOperation The polling operation to be called by the {@link Poller} instance. This is a callback into the client library,
* which must never return {@code null}, and which must always have a non-null {@link com.azure.core.util.polling.PollResponse.OperationStatus}.
*{@link Mono} returned from poll operation should never return {@link Mono#error(Throwable)}.If any unexpected scenario happens in poll operation,
* @param pollInterval Not-null and greater than zero poll interval.
* @param pollOperation The polling operation to be called by the {@link Poller} instance. This is a callback into the client library,
* which must never return {@code null}, and which must always have a non-null {@link com.azure.core.util.polling.OperationStatus}.
* {@link Mono} returned from poll operation should never return {@link Mono#error(Throwable)}.If any unexpected scenario happens in poll operation,
* it should handle it and return a valid {@link PollResponse}. However if poll operation returns {@link Mono#error(Throwable)},
* the {@link Poller} will disregard that and continue to poll.
* @param cancelOperation cancel operation if cancellation is supported by the service. It can be {@code null} which will indicate to the {@link Poller}
* that cancel operation is not supported by Azure service.
* @throws NullPointerException If {@code pollInterval} or {@code pollOperation} are {@code null}.
* @throws NullPointerException If {@code pollInterval} or {@code pollOperation} are {@code null}.
* @throws IllegalArgumentException if {@code pollInterval} is less than or equal to zero.
*/
public Poller(Duration pollInterval, Function<PollResponse<T>, Mono<PollResponse<T>>> pollOperation, Consumer<Poller> cancelOperation) {
Expand All @@ -162,7 +163,7 @@ public Poller(Duration pollInterval, Function<PollResponse<T>, Mono<PollResponse
* Attempts to cancel the long-running operation that this {@link Poller} represents. This is possible only if the service supports it,
* otherwise an {@code UnsupportedOperationException} will be thrown.
* <p>
* It will call cancelOperation if status is {@link com.azure.core.util.polling.PollResponse.OperationStatus#IN_PROGRESS} otherwise it does nothing.
* It will call cancelOperation if status is {@link OperationStatus.State#IN_PROGRESS} otherwise it does nothing.
*
* @throws UnsupportedOperationException when cancel operation is not provided.
*/
Expand All @@ -173,18 +174,65 @@ public void cancelOperation() throws UnsupportedOperationException {

// We can not cancel an operation if it was never started
// It only make sense to call cancel operation if current status IN_PROGRESS.
if (this.pollResponse != null && this.pollResponse.getStatus() != PollResponse.OperationStatus.IN_PROGRESS) {
if (this.pollResponse != null && this.pollResponse.getStatus().getState() != OperationStatus.State.IN_PROGRESS) {
return;
}

//Time to call cancel
this.cancelOperation.accept(this);
}

/**
* This method returns an updated {@link Flux} that is observing specified {@link com.azure.core.util.polling.OperationStatus.State} and
* can be subscribed to, enabling a subscriber to receive these specific notification of {@link PollResponse}, as it is received.
* @param observeState which user want to observe, must specify complete {@link com.azure.core.util.polling.OperationStatus.State} if interested.
* User not required to specify {@link com.azure.core.util.polling.OperationStatus.State#OTHER} here if interested in other state.
* @param observeOtherStates which user is interested to observe.
* @return A {@link Flux} that can be subscribed to receive poll responses as the long-running operation executes.
*/
public Flux<PollResponse<T>> getObserver(List<OperationStatus.State> observeState, List<String> observeOtherStates) {
if (observeState == null && (observeOtherStates == null || observeOtherStates.size() == 0)) {
throw new IllegalArgumentException("observeOperationStates and observeStates both can not be null or empty.");
}
//TODO : Design discussion, We may decide to add complete state. If user did not added them, we may never listen them and LRO would never complete.
//observeState.add(OperationStatus.State.SUCCESSFULLY_COMPLETED);
//observeState.add(OperationStatus.State.USER_CANCELLED);
//observeState.add(OperationStatus.State.FAILED);
this.fluxHandle = this.fluxHandle.filterWhen(tPollResponse -> matchesState(tPollResponse, observeState, observeOtherStates));
return this.fluxHandle;
}

/*
* Matches {@Code currentPollResponse} with state which user want to observe.
* @param currentPollResponse
* @param observeOperationStates
* @param observeOtherStates
* @return
*/
private Mono<Boolean> matchesState(PollResponse<T> currentPollResponse, List<OperationStatus.State> observeOperationStates, List<String> observeOtherStates) {
List<OperationStatus.State> operationStates = observeOperationStates != null ? observeOperationStates : new ArrayList<>();

if (currentPollResponse.getStatus().getState() == OperationStatus.State.OTHER &&
currentPollResponse.getStatus().getOtherStatus() != null &&
observeOtherStates != null) {
if (observeOtherStates.contains(currentPollResponse.getStatus().getOtherStatus())) {
return Mono.just(true);
}
} else {
if (operationStates.contains(currentPollResponse.getStatus().getState())) {
return Mono.just(true);
}
}
return Mono.just(false);
}

/**
* This method returns a {@link Flux} that can be subscribed to, enabling a subscriber to receive notification of
* every {@link PollResponse}, as it is received.
*
* This will return updated {@link Flux} if user had made call to {@link Poller#getObserver(List, List)} earlier and will selectively receive notification for
* only those {@link com.azure.core.util.polling.OperationStatus.State} specified in {@link Poller#getObserver(List, List)}.
*
* @return A {@link Flux} that can be subscribed to receive poll responses as the long-running operation executes.
*/
public Flux<PollResponse<T>> getObserver() {
Expand Down Expand Up @@ -214,10 +262,10 @@ public Mono<PollResponse<T>> poll() {
}

/**
* Blocks execution and wait for polling to complete. The polling is considered complete based on status defined in {@link com.azure.core.util.polling.PollResponse.OperationStatus}.
* Blocks execution and wait for polling to complete. The polling is considered complete based on status defined in {@link com.azure.core.util.polling.OperationStatus}.
* <p>It will enable auto-polling if it was disable by user.
*
* @return returns final {@link PollResponse} when polling is complete as defined in {@link com.azure.core.util.polling.PollResponse.OperationStatus}.
* @return returns final {@link PollResponse} when polling is complete as defined in {@link com.azure.core.util.polling.OperationStatus}.
*/
public PollResponse<T> block() {
if (!isAutoPollingEnabled()) {
Expand Down Expand Up @@ -295,9 +343,9 @@ public final void setAutoPollingEnabled(boolean autoPollingEnabled) {
* @return true if operation is done/complete.
*/
private boolean hasCompleted() {
return pollResponse != null && (pollResponse.getStatus() == PollResponse.OperationStatus.SUCCESSFULLY_COMPLETED
|| pollResponse.getStatus() == PollResponse.OperationStatus.FAILED
|| pollResponse.getStatus() == PollResponse.OperationStatus.USER_CANCELLED);
return pollResponse != null && (pollResponse.getStatus().getState() == OperationStatus.State.SUCCESSFULLY_COMPLETED
|| pollResponse.getStatus().getState() == OperationStatus.State.FAILED
|| pollResponse.getStatus().getState() == OperationStatus.State.USER_CANCELLED);
}

/*
Expand All @@ -309,6 +357,7 @@ private boolean activeSubscriber() {

/**
* Indicates if auto polling is enabled. Refer to the {@link Poller} class-level JavaDoc for more details on auto-polling.
*
* @return A boolean value representing if auto-polling is enabled or not..
*/
public boolean isAutoPollingEnabled() {
Expand All @@ -320,7 +369,7 @@ public boolean isAutoPollingEnabled() {
*
* @return current status or {@code null} if no status is available.
*/
public PollResponse.OperationStatus getStatus() {
public OperationStatus getStatus() {
return this.pollResponse != null ? this.pollResponse.getStatus() : null;
}
}
Loading