Skip to content
Prev Previous commit
Next Next commit
Add implementation for method prepareBlocks in class ExternalShuffleC…
…lient
  • Loading branch information
biaoma-ty committed Aug 22, 2016
commit b49be73a476af75dd37c33378aef7352e0a4902c
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,35 @@ public void close() {

@Override
public void prepareBlocks(
String host,
int port,
String execId,
String[] prepareBlockIds,
String[] releaseBlocks,
BlockPreparingListener listener) {}
final String host,
final int port,
final String execId,
String[] prepareBlockIds,
final String[] releaseBlockIds,
BlockPreparingListener listener) {
logger.debug("send prepare block info to {}:{} (executor id {})", host, port, execId);

try {
RetryingBlockPreparer.PreparerStarter blockPrepareStarter = new RetryingBlockPreparer.PreparerStarter() {
@Override
public void createAndStart(String[] prepareBlockIds, String[] releaseBlocks, BlockPreparingListener listener) throws IOException {
TransportClient client = clientFactory.createClient(host, port);
new BlockToPrepareInfoSender(client, appId, execId, prepareBlockIds,
releaseBlockIds, listener).start();
}
};

int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
new RetryingBlockPreparer(conf, blockPrepareStarter, prepareBlockIds,
releaseBlockIds, listener).start();
} else {
blockPrepareStarter.createAndStart(prepareBlockIds, releaseBlockIds, listener);
}

} catch (Exception e) {
logger.error("Exception while sending the block list", e);
listener.onBlockPrepareFailure(e);
}
}
}