Skip to content

Commit ee13f8b

Browse files
committed
Merge r1609845 through r1612502 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1612505 13f79535-47bb-0310-9956-ffa450edef68
2 parents 3de6c61 + 25b0e84 commit ee13f8b

File tree

37 files changed

+656
-218
lines changed

37 files changed

+656
-218
lines changed

hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,10 @@ Release 2.6.0 - UNRELEASED
324324
HDFS-6616. Add exclude-datanodes feature to WebHDFS redirection so that it
325325
will not redirect retries to the same datanode. (zhaoyunjiong via szetszwo)
326326

327+
HDFS-6702. Change DFSClient to pass the StorageType from the namenode to
328+
datanodes and change datanode to write block replicas using the specified
329+
storage type. (szetszwo)
330+
327331
OPTIMIZATIONS
328332

329333
HDFS-6690. Deduplicate xattr names in memory. (wang)

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,7 @@ class DataStreamer extends Daemon {
313313
private DataInputStream blockReplyStream;
314314
private ResponseProcessor response = null;
315315
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
316+
private volatile StorageType[] storageTypes = null;
316317
private volatile String[] storageIDs = null;
317318
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
318319
CacheBuilder.newBuilder()
@@ -417,10 +418,12 @@ private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
417418
}
418419

419420
private void setPipeline(LocatedBlock lb) {
420-
setPipeline(lb.getLocations(), lb.getStorageIDs());
421+
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
421422
}
422-
private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
423+
private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
424+
String[] storageIDs) {
423425
this.nodes = nodes;
426+
this.storageTypes = storageTypes;
424427
this.storageIDs = storageIDs;
425428
}
426429

@@ -446,7 +449,7 @@ private void endBlock() {
446449
this.setName("DataStreamer for file " + src);
447450
closeResponder();
448451
closeStream();
449-
setPipeline(null, null);
452+
setPipeline(null, null, null);
450453
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
451454
}
452455

@@ -1031,10 +1034,12 @@ private void addDatanode2ExistingPipeline() throws IOException {
10311034
//transfer replica
10321035
final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
10331036
final DatanodeInfo[] targets = {nodes[d]};
1034-
transfer(src, targets, lb.getBlockToken());
1037+
final StorageType[] targetStorageTypes = {storageTypes[d]};
1038+
transfer(src, targets, targetStorageTypes, lb.getBlockToken());
10351039
}
10361040

10371041
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
1042+
final StorageType[] targetStorageTypes,
10381043
final Token<BlockTokenIdentifier> blockToken) throws IOException {
10391044
//transfer replica to the new datanode
10401045
Socket sock = null;
@@ -1056,7 +1061,7 @@ private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
10561061

10571062
//send the TRANSFER_BLOCK request
10581063
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
1059-
targets);
1064+
targets, targetStorageTypes);
10601065
out.flush();
10611066

10621067
//ack
@@ -1135,16 +1140,15 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
11351140
failed.add(nodes[errorIndex]);
11361141

11371142
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
1138-
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
1139-
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
1140-
newnodes.length-errorIndex);
1143+
arraycopy(nodes, newnodes, errorIndex);
1144+
1145+
final StorageType[] newStorageTypes = new StorageType[newnodes.length];
1146+
arraycopy(storageTypes, newStorageTypes, errorIndex);
11411147

11421148
final String[] newStorageIDs = new String[newnodes.length];
1143-
System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
1144-
System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
1145-
newStorageIDs.length-errorIndex);
1149+
arraycopy(storageIDs, newStorageIDs, errorIndex);
11461150

1147-
setPipeline(newnodes, newStorageIDs);
1151+
setPipeline(newnodes, newStorageTypes, newStorageIDs);
11481152

11491153
// Just took care of a node error while waiting for a node restart
11501154
if (restartingNodeIndex >= 0) {
@@ -1181,7 +1185,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
11811185

11821186
// set up the pipeline again with the remaining nodes
11831187
if (failPacket) { // for testing
1184-
success = createBlockOutputStream(nodes, newGS, isRecovery);
1188+
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
11851189
failPacket = false;
11861190
try {
11871191
// Give DNs time to send in bad reports. In real situations,
@@ -1190,7 +1194,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
11901194
Thread.sleep(2000);
11911195
} catch (InterruptedException ie) {}
11921196
} else {
1193-
success = createBlockOutputStream(nodes, newGS, isRecovery);
1197+
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
11941198
}
11951199

11961200
if (restartingNodeIndex >= 0) {
@@ -1242,6 +1246,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
12421246
private LocatedBlock nextBlockOutputStream() throws IOException {
12431247
LocatedBlock lb = null;
12441248
DatanodeInfo[] nodes = null;
1249+
StorageType[] storageTypes = null;
12451250
int count = dfsClient.getConf().nBlockWriteRetry;
12461251
boolean success = false;
12471252
ExtendedBlock oldBlock = block;
@@ -1264,11 +1269,12 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
12641269
bytesSent = 0;
12651270
accessToken = lb.getBlockToken();
12661271
nodes = lb.getLocations();
1272+
storageTypes = lb.getStorageTypes();
12671273

12681274
//
12691275
// Connect to first DataNode in the list.
12701276
//
1271-
success = createBlockOutputStream(nodes, 0L, false);
1277+
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
12721278

12731279
if (!success) {
12741280
DFSClient.LOG.info("Abandoning " + block);
@@ -1289,8 +1295,8 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
12891295
// connects to the first datanode in the pipeline
12901296
// Returns true if success, otherwise return failure.
12911297
//
1292-
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
1293-
boolean recoveryFlag) {
1298+
private boolean createBlockOutputStream(DatanodeInfo[] nodes,
1299+
StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
12941300
if (nodes.length == 0) {
12951301
DFSClient.LOG.info("nodes are empty for write pipeline of block "
12961302
+ block);
@@ -1332,9 +1338,10 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
13321338
// Xmit header info to datanode
13331339
//
13341340

1341+
BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
13351342
// send the request
1336-
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
1337-
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
1343+
new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
1344+
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
13381345
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
13391346
cachingStrategy.get());
13401347

@@ -2197,4 +2204,9 @@ ExtendedBlock getBlock() {
21972204
public long getFileId() {
21982205
return fileId;
21992206
}
2207+
2208+
private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
2209+
System.arraycopy(srcs, 0, dsts, 0, skipIndex);
2210+
System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
2211+
}
22002212
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.commons.logging.LogFactory;
2424
import org.apache.hadoop.classification.InterfaceAudience;
2525
import org.apache.hadoop.classification.InterfaceStability;
26+
import org.apache.hadoop.hdfs.StorageType;
2627
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
2728
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
2829
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -71,11 +72,20 @@ public void readBlock(final ExtendedBlock blk,
7172

7273
/**
7374
* Write a block to a datanode pipeline.
74-
*
75+
* The receiver datanode of this call is the next datanode in the pipeline.
76+
* The other downstream datanodes are specified by the targets parameter.
77+
* Note that the receiver {@link DatanodeInfo} is not required in the
78+
* parameter list since the receiver datanode knows its info. However, the
79+
* {@link StorageType} for storing the replica in the receiver datanode is a
80+
* parameter since the receiver datanode may support multiple storage types.
81+
*
7582
* @param blk the block being written.
83+
* @param storageType for storing the replica in the receiver datanode.
7684
* @param blockToken security token for accessing the block.
7785
* @param clientName client's name.
78-
* @param targets target datanodes in the pipeline.
86+
* @param targets other downstream datanodes in the pipeline.
87+
* @param targetStorageTypes target {@link StorageType}s corresponding
88+
* to the target datanodes.
7989
* @param source source datanode.
8090
* @param stage pipeline stage.
8191
* @param pipelineSize the size of the pipeline.
@@ -84,9 +94,11 @@ public void readBlock(final ExtendedBlock blk,
8494
* @param latestGenerationStamp the latest generation stamp of the block.
8595
*/
8696
public void writeBlock(final ExtendedBlock blk,
97+
final StorageType storageType,
8798
final Token<BlockTokenIdentifier> blockToken,
8899
final String clientName,
89100
final DatanodeInfo[] targets,
101+
final StorageType[] targetStorageTypes,
90102
final DatanodeInfo source,
91103
final BlockConstructionStage stage,
92104
final int pipelineSize,
@@ -110,7 +122,8 @@ public void writeBlock(final ExtendedBlock blk,
110122
public void transferBlock(final ExtendedBlock blk,
111123
final Token<BlockTokenIdentifier> blockToken,
112124
final String clientName,
113-
final DatanodeInfo[] targets) throws IOException;
125+
final DatanodeInfo[] targets,
126+
final StorageType[] targetStorageTypes) throws IOException;
114127

115128
/**
116129
* Request short circuit access file descriptors from a DataNode.
@@ -148,11 +161,13 @@ public void requestShortCircuitFds(final ExtendedBlock blk,
148161
* It is used for balancing purpose.
149162
*
150163
* @param blk the block being replaced.
164+
* @param storageType the {@link StorageType} for storing the block.
151165
* @param blockToken security token for accessing the block.
152166
* @param delHint the hint for deleting the block in the original datanode.
153167
* @param source the source datanode for receiving the block.
154168
*/
155169
public void replaceBlock(final ExtendedBlock blk,
170+
final StorageType storageType,
156171
final Token<BlockTokenIdentifier> blockToken,
157172
final String delHint,
158173
final DatanodeInfo source) throws IOException;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.classification.InterfaceAudience;
2727
import org.apache.hadoop.classification.InterfaceStability;
28+
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
2829
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
2930
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
3031
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -121,10 +122,13 @@ private void opReadBlock() throws IOException {
121122
/** Receive OP_WRITE_BLOCK */
122123
private void opWriteBlock(DataInputStream in) throws IOException {
123124
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
125+
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
124126
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
127+
PBHelper.convertStorageType(proto.getStorageType()),
125128
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
126129
proto.getHeader().getClientName(),
127-
PBHelper.convert(proto.getTargetsList()),
130+
targets,
131+
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
128132
PBHelper.convert(proto.getSource()),
129133
fromProto(proto.getStage()),
130134
proto.getPipelineSize(),
@@ -140,10 +144,12 @@ private void opWriteBlock(DataInputStream in) throws IOException {
140144
private void opTransferBlock(DataInputStream in) throws IOException {
141145
final OpTransferBlockProto proto =
142146
OpTransferBlockProto.parseFrom(vintPrefixed(in));
147+
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
143148
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
144149
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
145150
proto.getHeader().getClientName(),
146-
PBHelper.convert(proto.getTargetsList()));
151+
targets,
152+
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length));
147153
}
148154

149155
/** Receive {@link Op#REQUEST_SHORT_CIRCUIT_FDS} */
@@ -176,6 +182,7 @@ private void opRequestShortCircuitShm(DataInputStream in) throws IOException {
176182
private void opReplaceBlock(DataInputStream in) throws IOException {
177183
OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
178184
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()),
185+
PBHelper.convertStorageType(proto.getStorageType()),
179186
PBHelper.convert(proto.getHeader().getToken()),
180187
proto.getDelHint(),
181188
PBHelper.convert(proto.getSource()));

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.classification.InterfaceAudience;
2727
import org.apache.hadoop.classification.InterfaceStability;
28+
import org.apache.hadoop.hdfs.StorageType;
2829
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
2930
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
3031
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
@@ -111,9 +112,11 @@ public void readBlock(final ExtendedBlock blk,
111112

112113
@Override
113114
public void writeBlock(final ExtendedBlock blk,
115+
final StorageType storageType,
114116
final Token<BlockTokenIdentifier> blockToken,
115117
final String clientName,
116118
final DatanodeInfo[] targets,
119+
final StorageType[] targetStorageTypes,
117120
final DatanodeInfo source,
118121
final BlockConstructionStage stage,
119122
final int pipelineSize,
@@ -130,7 +133,9 @@ public void writeBlock(final ExtendedBlock blk,
130133

131134
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
132135
.setHeader(header)
136+
.setStorageType(PBHelper.convertStorageType(storageType))
133137
.addAllTargets(PBHelper.convert(targets, 1))
138+
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes, 1))
134139
.setStage(toProto(stage))
135140
.setPipelineSize(pipelineSize)
136141
.setMinBytesRcvd(minBytesRcvd)
@@ -150,12 +155,14 @@ public void writeBlock(final ExtendedBlock blk,
150155
public void transferBlock(final ExtendedBlock blk,
151156
final Token<BlockTokenIdentifier> blockToken,
152157
final String clientName,
153-
final DatanodeInfo[] targets) throws IOException {
158+
final DatanodeInfo[] targets,
159+
final StorageType[] targetStorageTypes) throws IOException {
154160

155161
OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
156162
.setHeader(DataTransferProtoUtil.buildClientHeader(
157163
blk, clientName, blockToken))
158164
.addAllTargets(PBHelper.convert(targets))
165+
.addAllTargetStorageTypes(PBHelper.convertStorageTypes(targetStorageTypes))
159166
.build();
160167

161168
send(out, Op.TRANSFER_BLOCK, proto);
@@ -196,11 +203,13 @@ public void requestShortCircuitShm(String clientName) throws IOException {
196203

197204
@Override
198205
public void replaceBlock(final ExtendedBlock blk,
206+
final StorageType storageType,
199207
final Token<BlockTokenIdentifier> blockToken,
200208
final String delHint,
201209
final DatanodeInfo source) throws IOException {
202210
OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
203211
.setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
212+
.setStorageType(PBHelper.convertStorageType(storageType))
204213
.setDelHint(delHint)
205214
.setSource(PBHelper.convertDatanodeInfo(source))
206215
.build();

0 commit comments

Comments
 (0)