Skip to content

Commit e3612e4

Browse files
committed
HDFS-6595. Allow the maximum threads for balancing on datanodes to be configurable. Contributed by Benoy Antony
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605565 13f79535-47bb-0310-9956-ffa450edef68
1 parent 1a3a7e0 commit e3612e4

File tree

5 files changed

+52
-15
lines changed

5 files changed

+52
-15
lines changed

hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,9 @@ Release 2.5.0 - UNRELEASED
476476
HDFS-6593. Move SnapshotDiffInfo out of INodeDirectorySnapshottable.
477477
(Jing Zhao via wheat9)
478478

479+
HDFS-6595. Allow the maximum threads for balancing on datanodes to be
480+
configurable. (Benoy Antony via szetszwo)
481+
479482
OPTIMIZATIONS
480483

481484
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
105105
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
106106
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
107107
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
108+
public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves";
109+
public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
108110
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
109111
public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
110112
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,6 @@ public class Balancer {
189189
/** The maximum number of concurrent blocks moves for
190190
* balancing purpose at a datanode
191191
*/
192-
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
193192
private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
194193
public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
195194
public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
@@ -231,6 +230,7 @@ public class Balancer {
231230

232231
private final ExecutorService moverExecutor;
233232
private final ExecutorService dispatcherExecutor;
233+
private final int maxConcurrentMovesPerNode;
234234

235235
/* This class keeps track of a scheduled block move */
236236
private class PendingBlockMove {
@@ -516,8 +516,8 @@ private static class BalancerDatanode {
516516
private long scheduledSize = 0L;
517517
protected long delayUntil = 0L;
518518
// blocks being moved but not confirmed yet
519-
private final List<PendingBlockMove> pendingBlocks =
520-
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
519+
private final List<PendingBlockMove> pendingBlocks;
520+
private final int maxConcurrentMoves;
521521

522522
@Override
523523
public String toString() {
@@ -528,7 +528,8 @@ public String toString() {
528528
/* Constructor
529529
* Depending on avgutil & threshold, calculate maximum bytes to move
530530
*/
531-
private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
531+
private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
532+
int maxConcurrentMoves) {
532533
datanode = node;
533534
utilization = policy.getUtilization(node);
534535
final double avgUtil = policy.getAvgUtilization();
@@ -545,6 +546,8 @@ private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double thres
545546
maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
546547
}
547548
this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
549+
this.maxConcurrentMoves = maxConcurrentMoves;
550+
this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
548551
}
549552

550553
/** Get the datanode */
@@ -606,7 +609,7 @@ synchronized private boolean isDelayActive() {
606609

607610
/* Check if the node can schedule more blocks to move */
608611
synchronized private boolean isPendingQNotFull() {
609-
if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
612+
if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
610613
return true;
611614
}
612615
return false;
@@ -655,8 +658,9 @@ public void run() {
655658
= new ArrayList<BalancerBlock>();
656659

657660
/* constructor */
658-
private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
659-
super(node, policy, threshold);
661+
private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
662+
int maxConcurrentMoves) {
663+
super(node, policy, threshold, maxConcurrentMoves);
660664
}
661665

662666
/** Add a node task */
@@ -869,6 +873,9 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
869873
this.dispatcherExecutor = Executors.newFixedThreadPool(
870874
conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
871875
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
876+
this.maxConcurrentMovesPerNode =
877+
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
878+
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
872879
}
873880

874881
/* Given a data node set, build a network topology and decide
@@ -908,7 +915,7 @@ private long initNodes(DatanodeInfo[] datanodes) {
908915
BalancerDatanode datanodeS;
909916
final double avg = policy.getAvgUtilization();
910917
if (policy.getUtilization(datanode) >= avg) {
911-
datanodeS = new Source(datanode, policy, threshold);
918+
datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
912919
if (isAboveAvgUtilized(datanodeS)) {
913920
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
914921
} else {
@@ -919,7 +926,8 @@ private long initNodes(DatanodeInfo[] datanodes) {
919926
-threshold)*datanodeS.datanode.getCapacity()/100.0);
920927
}
921928
} else {
922-
datanodeS = new BalancerDatanode(datanode, policy, threshold);
929+
datanodeS = new BalancerDatanode(datanode, policy, threshold,
930+
maxConcurrentMovesPerNode);
923931
if ( isBelowOrEqualAvgUtilized(datanodeS)) {
924932
this.belowAvgUtilizedDatanodes.add(datanodeS);
925933
} else {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,17 @@ class DataXceiverServer implements Runnable {
6363
*/
6464
static class BlockBalanceThrottler extends DataTransferThrottler {
6565
private int numThreads;
66+
private int maxThreads;
6667

6768
/**Constructor
6869
*
6970
* @param bandwidth Total amount of bandwidth can be used for balancing
7071
*/
71-
private BlockBalanceThrottler(long bandwidth) {
72+
private BlockBalanceThrottler(long bandwidth, int maxThreads) {
7273
super(bandwidth);
74+
this.maxThreads = maxThreads;
7375
LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
76+
LOG.info("Number threads for balancing is "+ maxThreads);
7477
}
7578

7679
/** Check if the block move can start.
@@ -79,7 +82,7 @@ private BlockBalanceThrottler(long bandwidth) {
7982
* the counter is incremented; False otherwise.
8083
*/
8184
synchronized boolean acquire() {
82-
if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
85+
if (numThreads >= maxThreads) {
8386
return false;
8487
}
8588
numThreads++;
@@ -120,8 +123,10 @@ synchronized void release() {
120123

121124
//set up parameter for cluster balancing
122125
this.balanceThrottler = new BlockBalanceThrottler(
123-
conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
124-
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
126+
conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
127+
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
128+
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
129+
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
125130
}
126131

127132
@Override

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,13 @@ private void runBalancer(Configuration conf,
370370
// start rebalancing
371371
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
372372
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
373-
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
374-
373+
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
374+
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
375+
assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
376+
return;
377+
} else {
378+
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
379+
}
375380
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
376381
LOG.info("Rebalancing with default ctor.");
377382
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
@@ -462,6 +467,20 @@ void testBalancer1Internal(Configuration conf) throws Exception {
462467
new String[] {RACK0, RACK1});
463468
}
464469

470+
@Test(timeout=100000)
471+
public void testBalancerWithZeroThreadsForMove() throws Exception {
472+
Configuration conf = new HdfsConfiguration();
473+
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
474+
testBalancer1Internal (conf);
475+
}
476+
477+
@Test(timeout=100000)
478+
public void testBalancerWithNonZeroThreadsForMove() throws Exception {
479+
Configuration conf = new HdfsConfiguration();
480+
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
481+
testBalancer1Internal (conf);
482+
}
483+
465484
@Test(timeout=100000)
466485
public void testBalancer2() throws Exception {
467486
testBalancer2Internal(new HdfsConfiguration());

0 commit comments

Comments
 (0)