@@ -1414,38 +1414,59 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw(
14141414 }
14151415
14161416 @ Override // FsDatasetSpi
1417- public synchronized ReplicaHandler createTemporary (
1417+ public ReplicaHandler createTemporary (
14181418 StorageType storageType , ExtendedBlock b ) throws IOException {
1419- ReplicaInfo replicaInfo = volumeMap .get (b .getBlockPoolId (), b .getBlockId ());
1420- if (replicaInfo != null ) {
1421- if (replicaInfo .getGenerationStamp () < b .getGenerationStamp ()
1422- && replicaInfo instanceof ReplicaInPipeline ) {
1423- // Stop the previous writer
1424- ((ReplicaInPipeline )replicaInfo )
1425- .stopWriter (datanode .getDnConf ().getXceiverStopTimeout ());
1426- invalidate (b .getBlockPoolId (), new Block []{replicaInfo });
1427- } else {
1428- throw new ReplicaAlreadyExistsException ("Block " + b +
1429- " already exists in state " + replicaInfo .getState () +
1430- " and thus cannot be created." );
1419+ long startTimeMs = Time .monotonicNow ();
1420+ long writerStopTimeoutMs = datanode .getDnConf ().getXceiverStopTimeout ();
1421+ ReplicaInfo lastFoundReplicaInfo = null ;
1422+ do {
1423+ synchronized (this ) {
1424+ ReplicaInfo currentReplicaInfo =
1425+ volumeMap .get (b .getBlockPoolId (), b .getBlockId ());
1426+ if (currentReplicaInfo == lastFoundReplicaInfo ) {
1427+ if (lastFoundReplicaInfo != null ) {
1428+ invalidate (b .getBlockPoolId (), new Block [] { lastFoundReplicaInfo });
1429+ }
1430+ FsVolumeReference ref =
1431+ volumes .getNextVolume (storageType , b .getNumBytes ());
1432+ FsVolumeImpl v = (FsVolumeImpl ) ref .getVolume ();
1433+ // create a temporary file to hold block in the designated volume
1434+ File f ;
1435+ try {
1436+ f = v .createTmpFile (b .getBlockPoolId (), b .getLocalBlock ());
1437+ } catch (IOException e ) {
1438+ IOUtils .cleanup (null , ref );
1439+ throw e ;
1440+ }
1441+ ReplicaInPipeline newReplicaInfo =
1442+ new ReplicaInPipeline (b .getBlockId (), b .getGenerationStamp (), v ,
1443+ f .getParentFile (), 0 );
1444+ volumeMap .add (b .getBlockPoolId (), newReplicaInfo );
1445+ return new ReplicaHandler (newReplicaInfo , ref );
1446+ } else {
1447+ if (!(currentReplicaInfo .getGenerationStamp () < b
1448+ .getGenerationStamp () && currentReplicaInfo instanceof ReplicaInPipeline )) {
1449+ throw new ReplicaAlreadyExistsException ("Block " + b
1450+ + " already exists in state " + currentReplicaInfo .getState ()
1451+ + " and thus cannot be created." );
1452+ }
1453+ lastFoundReplicaInfo = currentReplicaInfo ;
1454+ }
14311455 }
1432- }
14331456
1434- FsVolumeReference ref = volumes .getNextVolume (storageType , b .getNumBytes ());
1435- FsVolumeImpl v = (FsVolumeImpl ) ref .getVolume ();
1436- // create a temporary file to hold block in the designated volume
1437- File f ;
1438- try {
1439- f = v .createTmpFile (b .getBlockPoolId (), b .getLocalBlock ());
1440- } catch (IOException e ) {
1441- IOUtils .cleanup (null , ref );
1442- throw e ;
1443- }
1457+ // Hang too long, just bail out. This is not supposed to happen.
1458+ long writerStopMs = Time .monotonicNow () - startTimeMs ;
1459+ if (writerStopMs > writerStopTimeoutMs ) {
1460+ LOG .warn ("Unable to stop existing writer for block " + b + " after "
1461+ + writerStopMs + " miniseconds." );
1462+ throw new IOException ("Unable to stop existing writer for block " + b
1463+ + " after " + writerStopMs + " miniseconds." );
1464+ }
14441465
1445- ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline ( b . getBlockId (),
1446- b . getGenerationStamp (), v , f . getParentFile (), 0 );
1447- volumeMap . add ( b . getBlockPoolId (), newReplicaInfo );
1448- return new ReplicaHandler ( newReplicaInfo , ref );
1466+ // Stop the previous writer
1467+ (( ReplicaInPipeline ) lastFoundReplicaInfo )
1468+ . stopWriter ( writerStopTimeoutMs );
1469+ } while ( true );
14491470 }
14501471
14511472 /**
0 commit comments