@@ -1412,38 +1412,59 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw(
14121412 }
14131413
14141414 @ Override // FsDatasetSpi
1415- public synchronized ReplicaHandler createTemporary (
1415+ public ReplicaHandler createTemporary (
14161416 StorageType storageType , ExtendedBlock b ) throws IOException {
1417- ReplicaInfo replicaInfo = volumeMap .get (b .getBlockPoolId (), b .getBlockId ());
1418- if (replicaInfo != null ) {
1419- if (replicaInfo .getGenerationStamp () < b .getGenerationStamp ()
1420- && replicaInfo instanceof ReplicaInPipeline ) {
1421- // Stop the previous writer
1422- ((ReplicaInPipeline )replicaInfo )
1423- .stopWriter (datanode .getDnConf ().getXceiverStopTimeout ());
1424- invalidate (b .getBlockPoolId (), new Block []{replicaInfo });
1425- } else {
1426- throw new ReplicaAlreadyExistsException ("Block " + b +
1427- " already exists in state " + replicaInfo .getState () +
1428- " and thus cannot be created." );
1417+ long startTimeMs = Time .monotonicNow ();
1418+ long writerStopTimeoutMs = datanode .getDnConf ().getXceiverStopTimeout ();
1419+ ReplicaInfo lastFoundReplicaInfo = null ;
1420+ do {
1421+ synchronized (this ) {
1422+ ReplicaInfo currentReplicaInfo =
1423+ volumeMap .get (b .getBlockPoolId (), b .getBlockId ());
1424+ if (currentReplicaInfo == lastFoundReplicaInfo ) {
1425+ if (lastFoundReplicaInfo != null ) {
1426+ invalidate (b .getBlockPoolId (), new Block [] { lastFoundReplicaInfo });
1427+ }
1428+ FsVolumeReference ref =
1429+ volumes .getNextVolume (storageType , b .getNumBytes ());
1430+ FsVolumeImpl v = (FsVolumeImpl ) ref .getVolume ();
1431+ // create a temporary file to hold block in the designated volume
1432+ File f ;
1433+ try {
1434+ f = v .createTmpFile (b .getBlockPoolId (), b .getLocalBlock ());
1435+ } catch (IOException e ) {
1436+ IOUtils .cleanup (null , ref );
1437+ throw e ;
1438+ }
1439+ ReplicaInPipeline newReplicaInfo =
1440+ new ReplicaInPipeline (b .getBlockId (), b .getGenerationStamp (), v ,
1441+ f .getParentFile (), 0 );
1442+ volumeMap .add (b .getBlockPoolId (), newReplicaInfo );
1443+ return new ReplicaHandler (newReplicaInfo , ref );
1444+ } else {
1445+ if (!(currentReplicaInfo .getGenerationStamp () < b
1446+ .getGenerationStamp () && currentReplicaInfo instanceof ReplicaInPipeline )) {
1447+ throw new ReplicaAlreadyExistsException ("Block " + b
1448+ + " already exists in state " + currentReplicaInfo .getState ()
1449+ + " and thus cannot be created." );
1450+ }
1451+ lastFoundReplicaInfo = currentReplicaInfo ;
1452+ }
14291453 }
1430- }
14311454
1432- FsVolumeReference ref = volumes .getNextVolume (storageType , b .getNumBytes ());
1433- FsVolumeImpl v = (FsVolumeImpl ) ref .getVolume ();
1434- // create a temporary file to hold block in the designated volume
1435- File f ;
1436- try {
1437- f = v .createTmpFile (b .getBlockPoolId (), b .getLocalBlock ());
1438- } catch (IOException e ) {
1439- IOUtils .cleanup (null , ref );
1440- throw e ;
1441- }
1455+ // Hang too long, just bail out. This is not supposed to happen.
1456+ long writerStopMs = Time .monotonicNow () - startTimeMs ;
1457+ if (writerStopMs > writerStopTimeoutMs ) {
1458+ LOG .warn ("Unable to stop existing writer for block " + b + " after "
1459+ + writerStopMs + " miniseconds." );
1460+ throw new IOException ("Unable to stop existing writer for block " + b
1461+ + " after " + writerStopMs + " miniseconds." );
1462+ }
14421463
1443- ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline ( b . getBlockId (),
1444- b . getGenerationStamp (), v , f . getParentFile (), 0 );
1445- volumeMap . add ( b . getBlockPoolId (), newReplicaInfo );
1446- return new ReplicaHandler ( newReplicaInfo , ref );
1464+ // Stop the previous writer
1465+ (( ReplicaInPipeline ) lastFoundReplicaInfo )
1466+ . stopWriter ( writerStopTimeoutMs );
1467+ } while ( true );
14471468 }
14481469
14491470 /**
0 commit comments