Skip to content
Prev Previous commit
Next Next commit
Fix typo error
  • Loading branch information
biaoma-ty committed Aug 23, 2016
commit 09ab2789baf95e47653a87c0bb35d9c979ecf5c7
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
package org.apache.spark.network.shuffle;

public interface BlockPreparingListener {
void onBlockPrepareSuccess();
void onBlockPrepareFailure(Throwable exception);
void onBlockPrepareSuccess();
void onBlockPrepareFailure(Throwable exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,58 +31,58 @@


public class BlockToPrepareInfoSender {
private final Logger logger = LoggerFactory.getLogger(BlockToPrepareInfoSender.class);
private final Logger logger = LoggerFactory.getLogger(BlockToPrepareInfoSender.class);

private final TransportClient client;
private final PrepareBlocks prepareMessage;
private final String[] blockIds;
private final String[] blocksToRelease;
private final BlockPreparingListener listener;
private final PrepareRequestReceivedCallBack requestReceivedCallBack;
private final TransportClient client;
private final PrepareBlocks prepareMessage;
private final String[] blockIds;
private final String[] blocksToRelease;
private final BlockPreparingListener listener;
private final PrepareRequestReceivedCallBack requestReceivedCallBack;

public BlockToPrepareInfoSender(TransportClient client,
String appId,
String execId,
String[] blockIds,
String[] blocksToRelease,
BlockPreparingListener listener) {
this.client = client;
this.prepareMessage = new PrepareBlocks(appId, execId, blockIds, blocksToRelease);
this.blockIds = blockIds;
this.blocksToRelease = blocksToRelease;
this.listener = listener;
this.requestReceivedCallBack = new PrepareCallBack();
}
public BlockToPrepareInfoSender(
TransportClient client,
String appId,
String execId,
String[] blockIds,
String[] blocksToRelease,
BlockPreparingListener listener) {
this.client = client;
this.prepareMessage = new PrepareBlocks(appId, execId, blockIds, blocksToRelease);
this.blockIds = blockIds;
this.blocksToRelease = blocksToRelease;
this.listener = listener;
this.requestReceivedCallBack = new PrepareCallBack();
}

private class PrepareCallBack implements PrepareRequestReceivedCallBack {
@Override
public void onSuccess() {
listener.onBlockPrepareSuccess();
}
private class PrepareCallBack implements PrepareRequestReceivedCallBack {
@Override
public void onSuccess() {
listener.onBlockPrepareSuccess();
}

@Override
public void onFailure(Throwable e) {
listener.onBlockPrepareFailure(e);
}
@Override
public void onFailure(Throwable e) {
listener.onBlockPrepareFailure(e);
}
}

public void start() {
if (blockIds.length == 0) {
// throw new IllegalArgumentException("Zero-size blockIds array");
logger.warn("Zero-size blockIds array");
}
public void start() {
if (blockIds.length == 0) {
// throw new IllegalArgumentException("Zero-size blockIds array");
logger.warn("Zero-size blockIds array");
}

logger.debug("PrepareMessageSender start method called");
client.sendRpc(prepareMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
logger.debug("Successfully send prepare block's info, ready for the next step");
}
client.sendRpc(prepareMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
logger.debug("Successfully send prepare block's info, ready for the next step");
}

@Override
public void onFailure(Throwable e) {
logger.error("Failed while send the prepare message");
}
});
}
@Override
public void onFailure(Throwable e) {
logger.error("Failed while send the prepare message");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ public void close() {

@Override
public void prepareBlocks(
final String host,
final int port,
final String execId,
String[] prepareBlockIds,
final String[] releaseBlockIds,
BlockPreparingListener listener) {
logger.debug("send prepare block info to {}:{} (executor id {})", host, port, execId);
final String host,
final int port,
final String execId,
String[] prepareBlockIds,
final String[] releaseBlockIds,
BlockPreparingListener listener) {
logger.debug("Send prepare block info to {}:{} (executor id {})", host, port, execId);

try {
RetryingBlockPreparer.PreparerStarter blockPrepareStarter = new RetryingBlockPreparer.PreparerStarter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,134 +33,135 @@

public class RetryingBlockPreparer {

public static interface PreparerStarter {
void createAndStart(String[] prepareBlockIds, String[] releaseBlocks, BlockPreparingListener listener) throws IOException;
public static interface PreparerStarter {
void createAndStart(String[] prepareBlockIds, String[] releaseBlocks, BlockPreparingListener listener) throws IOException;
}

private static final ExecutorService executorService = Executors.newCachedThreadPool(
NettyUtils.createThreadFactory("Prepare Info Send Retry")
);

private final Logger logger = LoggerFactory.getLogger(RetryingBlockPreparer.class);

private final PreparerStarter preparerStarter;

private final BlockPreparingListener listener;

private final int maxRetries;

private final int retryWaitTime;

private int retryCount = 0;

private final LinkedHashSet<String> outstandingBlockInfosForPrepare;

private final LinkedHashSet<String> outStandingBlockInfosForRelease;

private RetryingBlockPreparerListener currentListener;

public RetryingBlockPreparer(
TransportConf conf,
PreparerStarter prepareStarter,
String[] prepareBlockIds,
String[] releaseBlockIds,
BlockPreparingListener listener) {
this.preparerStarter = prepareStarter;
this.listener = listener;
this.maxRetries = conf.maxIORetries();
this.retryWaitTime = conf.ioRetryWaitTimeMs();
this.outstandingBlockInfosForPrepare = Sets.newLinkedHashSet();
this.outStandingBlockInfosForRelease = Sets.newLinkedHashSet();
Collections.addAll(outstandingBlockInfosForPrepare, prepareBlockIds);
Collections.addAll(outStandingBlockInfosForRelease, releaseBlockIds);
this.currentListener = new RetryingBlockPreparerListener();
}

public void start(){
senAllOutStanding();
}

private void senAllOutStanding() {
String[] blockIdsToSendForPrepare;
String[] blockIdsToSendForRelease;
int numRetries;
RetryingBlockPreparerListener myListener;
synchronized (this) {
blockIdsToSendForPrepare = outstandingBlockInfosForPrepare.toArray(new String[outstandingBlockInfosForPrepare.size()]);
blockIdsToSendForRelease = outStandingBlockInfosForRelease.toArray(new String[outStandingBlockInfosForRelease.size()]);
numRetries = retryCount;
myListener = currentListener;
}

private static final ExecutorService executorService = Executors.newCachedThreadPool(
NettyUtils.createThreadFactory("Prepare Info Send Retry")
);

private final Logger logger = LoggerFactory.getLogger(RetryingBlockPreparer.class);

private final PreparerStarter preparerStarter;

private final BlockPreparingListener listener;

private final int maxRetries;

private final int retryWaitTime;

private int retryCount = 0;

private final LinkedHashSet<String> outstandingBlockInfosForPrepare;

private final LinkedHashSet<String> outStandingBlockInfosForRelease;

private RetryingBlockPreparerListener currentListener;

public RetryingBlockPreparer(
TransportConf conf,
PreparerStarter prepareStarter,
String[] prepareBlockIds,
String[] releaseBlockIds,
BlockPreparingListener listener) {
this.preparerStarter = prepareStarter;
this.listener = listener;
this.maxRetries = conf.maxIORetries();
this.retryWaitTime = conf.ioRetryWaitTimeMs();
this.outstandingBlockInfosForPrepare = Sets.newLinkedHashSet();
this.outStandingBlockInfosForRelease = Sets.newLinkedHashSet();
Collections.addAll(outstandingBlockInfosForPrepare, prepareBlockIds);
Collections.addAll(outStandingBlockInfosForRelease, releaseBlockIds);
this.currentListener = new RetryingBlockPreparerListener();
try {
preparerStarter.createAndStart(blockIdsToSendForPrepare, blockIdsToSendForRelease ,myListener);
listener.onBlockPrepareSuccess();
} catch (Exception e) {
logger.error(String.format("Exception while begin send %s outstanding block info %s",
blockIdsToSendForPrepare.length, numRetries > 0 ? "(after )" + numRetries + "retries)" : ""), e);
if (shouldRetry(e)) {
initiateRetry();
} else {
for (String bid: blockIdsToSendForPrepare) {
listener.onBlockPrepareFailure(e);
}
}
}

public void start(){
}

private synchronized void initiateRetry(){
retryCount += 1;
currentListener = new RetryingBlockPreparerListener();
logger.info("Retrying send ({}/{}) for {} outstading_prepare and release blocks after {} ms",
retryCount, maxRetries, outstandingBlockInfosForPrepare.size()+outStandingBlockInfosForRelease.size(), retryWaitTime);

executorService.submit(new Runnable() {
@Override
public void run() {
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
senAllOutStanding();
}

private void senAllOutStanding() {
String[] blockIdsToSendForPrepare;
String[] blockIdsToSendForRelease;
int numRetries;
RetryingBlockPreparerListener myListener;
synchronized (this) {
blockIdsToSendForPrepare = outstandingBlockInfosForPrepare.toArray(new String[outstandingBlockInfosForPrepare.size()]);
blockIdsToSendForRelease = outStandingBlockInfosForRelease.toArray(new String[outStandingBlockInfosForRelease.size()]);
numRetries = retryCount;
myListener = currentListener;
}
});
}

private synchronized boolean shouldRetry(Throwable e) {
boolean isIOException = e instanceof IOException
|| (e.getCause() != null
&& e.getCause() instanceof IOException);
boolean hasRemainRetries = retryCount < maxRetries;
return isIOException && hasRemainRetries;
}

private class RetryingBlockPreparerListener implements BlockPreparingListener {
@Override
public void onBlockPrepareSuccess() {
boolean shouldForwardSuccess = false;
synchronized (RetryingBlockPreparer.this) {
if (this == currentListener) {
shouldForwardSuccess = true;
}

try {
preparerStarter.createAndStart(blockIdsToSendForPrepare, blockIdsToSendForRelease ,myListener);
listener.onBlockPrepareSuccess();
} catch (Exception e) {
logger.error(String.format("Exception while begin send %s outstanding block info %s",
blockIdsToSendForPrepare.length, numRetries > 0 ? "(after )" + numRetries + "retries)" : ""), e);
if (shouldRetry(e)) {
initiateRetry();
} else {
for (String bid: blockIdsToSendForPrepare) {
listener.onBlockPrepareFailure(e);
}
}
}

if (shouldForwardSuccess) {
listener.onBlockPrepareSuccess();
}
}

@Override
public void onBlockPrepareFailure(Throwable exception) {
boolean shouldForwardFailure = false;
synchronized (RetryingBlockPreparer.this) {
if (this == currentListener) {
initiateRetry();
} else {
logger.error(String.format("PrepareBlock failed to send blocks' info, " +
"and will not retry (%s retries)", retryCount), exception);
shouldForwardFailure = true;
}
}

private synchronized void initiateRetry(){
retryCount += 1;
currentListener = new RetryingBlockPreparerListener();
logger.info("Retrying send ({}/{}) for {} outstading_prepare and release blocks after {} ms",
retryCount, maxRetries, outstandingBlockInfosForPrepare.size()+outStandingBlockInfosForRelease.size(), retryWaitTime);

executorService.submit(new Runnable() {
@Override
public void run() {
Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
senAllOutStanding();
}
});
}
}

private synchronized boolean shouldRetry(Throwable e) {
boolean isIOException = e instanceof IOException
|| (e.getCause() != null && e.getCause() instanceof IOException);
boolean hasRemainRetries = retryCount < maxRetries;
return isIOException && hasRemainRetries;
}

private class RetryingBlockPreparerListener implements BlockPreparingListener {
@Override
public void onBlockPrepareSuccess() {
boolean shouldForwardSuccess = false;
synchronized (RetryingBlockPreparer.this) {
if (this == currentListener) {
shouldForwardSuccess = true;
}
}

if (shouldForwardSuccess) {
listener.onBlockPrepareSuccess();
}
}

@Override
public void onBlockPrepareFailure(Throwable exception) {
boolean shouldForwardFailure = false;
synchronized (RetryingBlockPreparer.this) {
if (this == currentListener) {
initiateRetry();
} else {
logger.error(String.format("PrepareBlock failed to send blocks' info, " +
"and will not retry (%s retries)", retryCount), exception);
shouldForwardFailure = true;
}
}

if (shouldForwardFailure) {
listener.onBlockPrepareFailure(exception);
}
}
if (shouldForwardFailure) {
listener.onBlockPrepareFailure(exception);
}
}
}
}
Loading