2121
2222import java .util .ArrayList ;
2323import java .util .Collection ;
24+ import java .util .EnumSet ;
2425import java .util .HashSet ;
2526import java .util .List ;
2627import java .util .Set ;
@@ -218,7 +219,8 @@ private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
218219 boolean avoidStaleNodes = (stats != null
219220 && stats .isAvoidingStaleDataNodesForWrite ());
220221 final Node localNode = chooseTarget (numOfReplicas , writer , excludedNodes ,
221- blocksize , maxNodesPerRack , results , avoidStaleNodes , storagePolicy );
222+ blocksize , maxNodesPerRack , results , avoidStaleNodes , storagePolicy ,
223+ EnumSet .noneOf (StorageType .class ), results .isEmpty ());
222224 if (!returnChosenNodes ) {
223225 results .removeAll (chosenStorage );
224226 }
@@ -238,7 +240,40 @@ private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
238240 int maxNodesPerRack = (totalNumOfReplicas -1 )/clusterMap .getNumOfRacks ()+2 ;
239241 return new int [] {numOfReplicas , maxNodesPerRack };
240242 }
241-
243+
244+ private static List <StorageType > selectStorageTypes (
245+ final BlockStoragePolicy storagePolicy ,
246+ final short replication ,
247+ final Iterable <StorageType > chosen ,
248+ final EnumSet <StorageType > unavailableStorages ,
249+ final boolean isNewBlock ) {
250+ final List <StorageType > storageTypes = storagePolicy .chooseStorageTypes (
251+ replication , chosen );
252+ final List <StorageType > removed = new ArrayList <StorageType >();
253+ for (int i = storageTypes .size () - 1 ; i >= 0 ; i --) {
254+ // replace/remove unavailable storage types.
255+ final StorageType t = storageTypes .get (i );
256+ if (unavailableStorages .contains (t )) {
257+ final StorageType fallback = isNewBlock ?
258+ storagePolicy .getCreationFallback (unavailableStorages )
259+ : storagePolicy .getReplicationFallback (unavailableStorages );
260+ if (fallback == null ) {
261+ removed .add (storageTypes .remove (i ));
262+ } else {
263+ storageTypes .set (i , fallback );
264+ }
265+ }
266+ }
267+ if (storageTypes .size () < replication ) {
268+ LOG .warn ("Failed to place enough replicas: replication is " + replication
269+ + " but only " + storageTypes .size () + " storage types can be selected "
270+ + "(selected=" + storageTypes
271+ + ", unavailable=" + unavailableStorages
272+ + ", removed=" + removed
273+ + ", policy=" + storagePolicy + ")" );
274+ }
275+ return storageTypes ;
276+ }
242277 /**
243278 * choose <i>numOfReplicas</i> from all data nodes
244279 * @param numOfReplicas additional number of replicas wanted
@@ -257,58 +292,74 @@ private Node chooseTarget(int numOfReplicas,
257292 final int maxNodesPerRack ,
258293 final List <DatanodeStorageInfo > results ,
259294 final boolean avoidStaleNodes ,
260- final BlockStoragePolicy storagePolicy ) {
295+ final BlockStoragePolicy storagePolicy ,
296+ final EnumSet <StorageType > unavailableStorages ,
297+ final boolean newBlock ) {
261298 if (numOfReplicas == 0 || clusterMap .getNumOfLeaves ()==0 ) {
262299 return writer ;
263300 }
264- int totalReplicasExpected = numOfReplicas + results .size ();
265-
266- int numOfResults = results .size ();
267- boolean newBlock = (numOfResults ==0 );
301+ final int numOfResults = results .size ();
302+ final int totalReplicasExpected = numOfReplicas + numOfResults ;
268303 if ((writer == null || !(writer instanceof DatanodeDescriptor )) && !newBlock ) {
269304 writer = results .get (0 ).getDatanodeDescriptor ();
270305 }
271306
272307 // Keep a copy of original excludedNodes
273308 final Set <Node > oldExcludedNodes = avoidStaleNodes ?
274309 new HashSet <Node >(excludedNodes ) : null ;
275- final List <StorageType > storageTypes = storagePolicy .chooseStorageTypes (
276- (short )totalReplicasExpected , DatanodeStorageInfo .toStorageTypes (results ));
310+
311+ // choose storage types; use fallbacks for unavailable storages
312+ final List <StorageType > storageTypes = selectStorageTypes (storagePolicy ,
313+ (short )totalReplicasExpected , DatanodeStorageInfo .toStorageTypes (results ),
314+ unavailableStorages , newBlock );
315+
316+ StorageType curStorageType = null ;
277317 try {
318+ if ((numOfReplicas = storageTypes .size ()) == 0 ) {
319+ throw new NotEnoughReplicasException (
320+ "All required storage types are unavailable: "
321+ + " unavailableStorages=" + unavailableStorages
322+ + ", storagePolicy=" + storagePolicy );
323+ }
324+
278325 if (numOfResults == 0 ) {
326+ curStorageType = storageTypes .remove (0 );
279327 writer = chooseLocalStorage (writer , excludedNodes , blocksize ,
280- maxNodesPerRack , results , avoidStaleNodes , storageTypes . remove ( 0 ) , true )
328+ maxNodesPerRack , results , avoidStaleNodes , curStorageType , true )
281329 .getDatanodeDescriptor ();
282330 if (--numOfReplicas == 0 ) {
283331 return writer ;
284332 }
285333 }
286334 final DatanodeDescriptor dn0 = results .get (0 ).getDatanodeDescriptor ();
287335 if (numOfResults <= 1 ) {
336+ curStorageType = storageTypes .remove (0 );
288337 chooseRemoteRack (1 , dn0 , excludedNodes , blocksize , maxNodesPerRack ,
289- results , avoidStaleNodes , storageTypes . remove ( 0 ) );
338+ results , avoidStaleNodes , curStorageType );
290339 if (--numOfReplicas == 0 ) {
291340 return writer ;
292341 }
293342 }
294343 if (numOfResults <= 2 ) {
295344 final DatanodeDescriptor dn1 = results .get (1 ).getDatanodeDescriptor ();
345+ curStorageType = storageTypes .remove (0 );
296346 if (clusterMap .isOnSameRack (dn0 , dn1 )) {
297347 chooseRemoteRack (1 , dn0 , excludedNodes , blocksize , maxNodesPerRack ,
298- results , avoidStaleNodes , storageTypes . remove ( 0 ) );
348+ results , avoidStaleNodes , curStorageType );
299349 } else if (newBlock ){
300350 chooseLocalRack (dn1 , excludedNodes , blocksize , maxNodesPerRack ,
301- results , avoidStaleNodes , storageTypes . remove ( 0 ) );
351+ results , avoidStaleNodes , curStorageType );
302352 } else {
303353 chooseLocalRack (writer , excludedNodes , blocksize , maxNodesPerRack ,
304- results , avoidStaleNodes , storageTypes . remove ( 0 ) );
354+ results , avoidStaleNodes , curStorageType );
305355 }
306356 if (--numOfReplicas == 0 ) {
307357 return writer ;
308358 }
309359 }
360+ curStorageType = storageTypes .remove (0 );
310361 chooseRandom (numOfReplicas , NodeBase .ROOT , excludedNodes , blocksize ,
311- maxNodesPerRack , results , avoidStaleNodes , storageTypes . remove ( 0 ) );
362+ maxNodesPerRack , results , avoidStaleNodes , curStorageType );
312363 } catch (NotEnoughReplicasException e ) {
313364 final String message = "Failed to place enough replicas, still in need of "
314365 + (totalReplicasExpected - results .size ()) + " to reach "
@@ -333,7 +384,16 @@ private Node chooseTarget(int numOfReplicas,
333384 // if the NotEnoughReplicasException was thrown in chooseRandom().
334385 numOfReplicas = totalReplicasExpected - results .size ();
335386 return chooseTarget (numOfReplicas , writer , oldExcludedNodes , blocksize ,
336- maxNodesPerRack , results , false , storagePolicy );
387+ maxNodesPerRack , results , false , storagePolicy , unavailableStorages ,
388+ newBlock );
389+ }
390+
391+ if (storageTypes .size () > 0 ) {
392+ // Retry chooseTarget with fallback storage types
393+ unavailableStorages .add (curStorageType );
394+ return chooseTarget (numOfReplicas , writer , excludedNodes , blocksize ,
395+ maxNodesPerRack , results , false , storagePolicy , unavailableStorages ,
396+ newBlock );
337397 }
338398 }
339399 return writer ;
0 commit comments