Просмотр исходного кода

HDFS-9781. FsDatasetImpl#getBlockReports can occasionally throw NullPointerException. Contributed by Manoj Govindassamy.

Xiao Chen 9 лет назад
Родитель
Сommit
07650bc37a

+ 16 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -1827,13 +1827,24 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     Map<String, BlockListAsLongs.Builder> builders =
         new HashMap<String, BlockListAsLongs.Builder>();
 
-    List<FsVolumeImpl> curVolumes = volumes.getVolumes();
-    for (FsVolumeSpi v : curVolumes) {
-      builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
-    }
-
+    List<FsVolumeImpl> curVolumes = null;
     try (AutoCloseableLock lock = datasetLock.acquire()) {
+      curVolumes = volumes.getVolumes();
+      for (FsVolumeSpi v : curVolumes) {
+        builders.put(v.getStorageID(), BlockListAsLongs.builder(maxDataLength));
+      }
+
+      Set<String> missingVolumesReported = new HashSet<>();
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
+        String volStorageID = b.getVolume().getStorageID();
+        if (!builders.containsKey(volStorageID)) {
+          if (!missingVolumesReported.contains(volStorageID)) {
+            LOG.warn("Storage volume: " + volStorageID + " missing for the"
+                + " replica block: " + b + ". Probably being removed!");
+            missingVolumesReported.add(volStorageID);
+          }
+          continue;
+        }
         switch(b.getState()) {
           case FINALIZED:
           case RBW:

+ 40 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java

@@ -590,8 +590,15 @@ public class TestFsDatasetImpl {
     final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0);
     final CountDownLatch startFinalizeLatch = new CountDownLatch(1);
     final CountDownLatch brReceivedLatch = new CountDownLatch(1);
+    final CountDownLatch volRemovedLatch = new CountDownLatch(1);
     class BlockReportThread extends Thread {
       public void run() {
+        // Lets wait for the volume remove process to start
+        try {
+          volRemovedLatch.await();
+        } catch (Exception e) {
+          LOG.info("Unexpected exception when waiting for vol removal:", e);
+        }
         LOG.info("Getting block report");
         dataset.getBlockReports(eb.getBlockPoolId());
         LOG.info("Successfully received block report");
@@ -599,18 +606,27 @@ public class TestFsDatasetImpl {
       }
     }
 
-    final BlockReportThread brt = new BlockReportThread();
     class ResponderThread extends Thread {
       public void run() {
         try (ReplicaHandler replica = dataset
             .createRbw(StorageType.DEFAULT, eb, false)) {
-          LOG.info("createRbw finished");
+          LOG.info("CreateRbw finished");
           startFinalizeLatch.countDown();
 
-          // Slow down while we're holding the reference to the volume
-          Thread.sleep(1000);
+          // Slow down while we're holding the reference to the volume.
+          // As we finalize a block, the volume is removed in parallel.
+          // Ignore any interrupts coming out of volume shutdown.
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {
+            LOG.info("Ignoring ", ie);
+          }
+
+          // Lets wait for the other thread finish getting block report
+          brReceivedLatch.await();
+
           dataset.finalizeBlock(eb);
-          LOG.info("finalizeBlock finished");
+          LOG.info("FinalizeBlock finished");
         } catch (Exception e) {
           LOG.warn("Exception caught. This should not affect the test", e);
         }
@@ -621,13 +637,28 @@ public class TestFsDatasetImpl {
     res.start();
     startFinalizeLatch.await();
 
+    // Verify if block report can be received
+    // when volume is being removed
+    final BlockReportThread brt = new BlockReportThread();
+    brt.start();
+
     Set<File> volumesToRemove = new HashSet<>();
     volumesToRemove.add(
         StorageLocation.parse(dataset.getVolume(eb).getBasePath()).getFile());
-    LOG.info("Removing volume " + volumesToRemove);
-    // Verify block report can be received during this
-    brt.start();
-    dataset.removeVolumes(volumesToRemove, true);
+    /**
+     * TODO: {@link FsDatasetImpl#removeVolumes(Set, boolean)} is throwing
+     * IllegalMonitorStateException when there is a parallel reader/writer
+     * to the volume. Remove below try/catch block after fixing HDFS-10830.
+     */
+    try {
+      LOG.info("Removing volume " + volumesToRemove);
+      dataset.removeVolumes(volumesToRemove, true);
+    } catch (Exception e) {
+      LOG.info("Unexpected issue while removing volume: ", e);
+    } finally {
+      volRemovedLatch.countDown();
+    }
+
     LOG.info("Volumes removed");
     brReceivedLatch.await();
   }