3434import org .apache .hadoop .hdfs .MiniDFSCluster ;
3535import org .apache .hadoop .hdfs .MiniDFSNNTopology ;
3636import org .apache .hadoop .hdfs .protocol .BlockListAsLongs ;
37+ import org .apache .hadoop .hdfs .protocol .ExtendedBlock ;
3738import org .apache .hadoop .hdfs .protocolPB .DatanodeProtocolClientSideTranslatorPB ;
3839import org .apache .hadoop .hdfs .server .common .Storage ;
3940import org .apache .hadoop .hdfs .server .datanode .fsdataset .FsDatasetSpi ;
6465import org .apache .commons .logging .Log ;
6566import org .apache .commons .logging .LogFactory ;
6667import org .mockito .Mockito ;
68+ import org .mockito .invocation .InvocationOnMock ;
69+ import org .mockito .stubbing .Answer ;
6770
6871import static org .apache .hadoop .hdfs .DFSConfigKeys .DFS_DATANODE_DATA_DIR_KEY ;
6972import static org .hamcrest .CoreMatchers .anyOf ;
7780import static org .junit .Assert .fail ;
7881import static org .mockito .Matchers .any ;
7982import static org .mockito .Matchers .anyString ;
83+ import static org .mockito .Mockito .doAnswer ;
8084import static org .mockito .Mockito .timeout ;
8185
8286public class TestDataNodeHotSwapVolumes {
@@ -577,6 +581,7 @@ private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
577581 final DataNode dn = cluster .getDataNodes ().get (dataNodeIdx );
578582 final FileSystem fs = cluster .getFileSystem ();
579583 final Path testFile = new Path ("/test" );
584+ final long lastTimeDiskErrorCheck = dn .getLastDiskErrorCheck ();
580585
581586 FSDataOutputStream out = fs .create (testFile , REPLICATION );
582587
@@ -586,6 +591,23 @@ private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx)
586591 out .write (writeBuf );
587592 out .hflush ();
588593
594+ // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the
595+ // BlockReceiver releases volume reference before finalizeBlock(), the blocks
596+ // on the volume will be removed, and finalizeBlock() throws IOE.
597+ final FsDatasetSpi <? extends FsVolumeSpi > data = dn .data ;
598+ dn .data = Mockito .spy (data );
599+ doAnswer (new Answer <Object >() {
600+ public Object answer (InvocationOnMock invocation )
601+ throws IOException , InterruptedException {
602+ Thread .sleep (1000 );
603+ // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that
604+ // the block is not removed, since the volume reference should not
605+ // be released at this point.
606+ data .finalizeBlock ((ExtendedBlock ) invocation .getArguments ()[0 ]);
607+ return null ;
608+ }
609+ }).when (dn .data ).finalizeBlock (any (ExtendedBlock .class ));
610+
589611 final CyclicBarrier barrier = new CyclicBarrier (2 );
590612
591613 List <String > oldDirs = getDataDirs (dn );
@@ -612,13 +634,19 @@ public void run() {
612634 out .hflush ();
613635 out .close ();
614636
637+ reconfigThread .join ();
638+
615639 // Verify the file has sufficient replications.
616640 DFSTestUtil .waitReplication (fs , testFile , REPLICATION );
617641 // Read the content back
618642 byte [] content = DFSTestUtil .readFileBuffer (fs , testFile );
619643 assertEquals (BLOCK_SIZE , content .length );
620644
621- reconfigThread .join ();
645+ // If an IOException thrown from BlockReceiver#run, it triggers
646+ // DataNode#checkDiskError(). So we can test whether checkDiskError() is called,
647+ // to see whether there is IOException in BlockReceiver#run().
648+ assertEquals (lastTimeDiskErrorCheck , dn .getLastDiskErrorCheck ());
649+
622650 if (!exceptions .isEmpty ()) {
623651 throw new IOException (exceptions .get (0 ).getCause ());
624652 }
0 commit comments