浏览代码

HDFS-5406. Merge r1551093 and r1551110 from trunk to branch-2

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1556084 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 年之前
父节点
当前提交
a7417a0af9

+ 22 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Time.now;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Map;
 
 
@@ -270,7 +271,8 @@ class BPServiceActor implements Runnable {
   private void reportReceivedDeletedBlocks() throws IOException {
   private void reportReceivedDeletedBlocks() throws IOException {
 
 
     // Generate a list of the pending reports for each storage under the lock
     // Generate a list of the pending reports for each storage under the lock
-    Map<String, ReceivedDeletedBlockInfo[]> blockArrays = Maps.newHashMap();
+    ArrayList<StorageReceivedDeletedBlocks> reports =
+        new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
     synchronized (pendingIncrementalBRperStorage) {
     synchronized (pendingIncrementalBRperStorage) {
       for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
       for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
            pendingIncrementalBRperStorage.entrySet()) {
            pendingIncrementalBRperStorage.entrySet()) {
@@ -283,33 +285,34 @@ class BPServiceActor implements Runnable {
           pendingReceivedRequests =
           pendingReceivedRequests =
               (pendingReceivedRequests > rdbi.length ?
               (pendingReceivedRequests > rdbi.length ?
                   (pendingReceivedRequests - rdbi.length) : 0);
                   (pendingReceivedRequests - rdbi.length) : 0);
-          blockArrays.put(storageUuid, rdbi);
+          reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi));
         }
         }
       }
       }
     }
     }
 
 
+    if (reports.size() == 0) {
+      // Nothing new to report.
+      return;
+    }
+
     // Send incremental block reports to the Namenode outside the lock
     // Send incremental block reports to the Namenode outside the lock
-    for (Map.Entry<String, ReceivedDeletedBlockInfo[]> entry :
-         blockArrays.entrySet()) {
-      final String storageUuid = entry.getKey();
-      final ReceivedDeletedBlockInfo[] rdbi = entry.getValue();
-
-      StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks(
-          storageUuid, rdbi) };
-      boolean success = false;
-      try {
-        bpNamenode.blockReceivedAndDeleted(bpRegistration,
-            bpos.getBlockPoolId(), report);
-        success = true;
-      } finally {
-        if (!success) {
-          synchronized (pendingIncrementalBRperStorage) {
+    boolean success = false;
+    try {
+      bpNamenode.blockReceivedAndDeleted(bpRegistration,
+          bpos.getBlockPoolId(),
+          reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
+      success = true;
+    } finally {
+      if (!success) {
+        synchronized (pendingIncrementalBRperStorage) {
+          for (StorageReceivedDeletedBlocks report : reports) {
             // If we didn't succeed in sending the report, put all of the
             // If we didn't succeed in sending the report, put all of the
             // blocks back onto our queue, but only in the case where we
             // blocks back onto our queue, but only in the case where we
             // didn't put something newer in the meantime.
             // didn't put something newer in the meantime.
             PerStoragePendingIncrementalBR perStorageMap =
             PerStoragePendingIncrementalBR perStorageMap =
-                pendingIncrementalBRperStorage.get(storageUuid);
-            pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi);
+                pendingIncrementalBRperStorage.get(report.getStorageID());
+            pendingReceivedRequests +=
+                perStorageMap.putMissingBlockInfos(report.getBlocks());
           }
           }
         }
         }
       }
       }

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -983,6 +983,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
   public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
       StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
       StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
     verifyRequest(nodeReg);
     verifyRequest(nodeReg);
+    metrics.incrBlockReceivedAndDeletedOps();
     if(blockStateChangeLog.isDebugEnabled()) {
     if(blockStateChangeLog.isDebugEnabled()) {
       blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
       blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

@@ -71,6 +71,8 @@ public class NameNodeMetrics {
   MutableCounterLong listSnapshottableDirOps;
   MutableCounterLong listSnapshottableDirOps;
   @Metric("Number of snapshotDiffReport operations")
   @Metric("Number of snapshotDiffReport operations")
   MutableCounterLong snapshotDiffReportOps;
   MutableCounterLong snapshotDiffReportOps;
+  @Metric("Number of blockReceivedAndDeleted calls")
+  MutableCounterLong blockReceivedAndDeletedOps;
 
 
   @Metric("Journal transactions") MutableRate transactions;
   @Metric("Journal transactions") MutableRate transactions;
   @Metric("Journal syncs") MutableRate syncs;
   @Metric("Journal syncs") MutableRate syncs;
@@ -210,6 +212,10 @@ public class NameNodeMetrics {
     snapshotDiffReportOps.incr();
     snapshotDiffReportOps.incr();
   }
   }
   
   
+  public void incrBlockReceivedAndDeletedOps() {
+    blockReceivedAndDeletedOps.incr();
+  }
+
   public void addTransaction(long latency) {
   public void addTransaction(long latency) {
     transactions.add(latency);
     transactions.add(latency);
   }
   }

+ 3 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -2144,17 +2144,14 @@ public class MiniDFSCluster {
   }
   }
 
 
   /**
   /**
-   * Get a storage directory for a datanode. There are two storage directories
-   * per datanode:
+   * Get a storage directory for a datanode.
    * <ol>
    * <ol>
    * <li><base directory>/data/data<2*dnIndex + 1></li>
    * <li><base directory>/data/data<2*dnIndex + 1></li>
    * <li><base directory>/data/data<2*dnIndex + 2></li>
    * <li><base directory>/data/data<2*dnIndex + 2></li>
    * </ol>
    * </ol>
    *
    *
    * @param dnIndex datanode index (starts from 0)
    * @param dnIndex datanode index (starts from 0)
-   * @param dirIndex directory index (0 or 1). Index 0 provides access to the
-   *          first storage directory. Index 1 provides access to the second
-   *          storage directory.
+   * @param dirIndex directory index.
    * @return Storage directory
    * @return Storage directory
    */
    */
   public static File getStorageDir(int dnIndex, int dirIndex) {
   public static File getStorageDir(int dnIndex, int dirIndex) {
@@ -2165,7 +2162,7 @@ public class MiniDFSCluster {
    * Calculate the DN instance-specific path for appending to the base dir
    * Calculate the DN instance-specific path for appending to the base dir
    * to determine the location of the storage of a DN instance in the mini cluster
    * to determine the location of the storage of a DN instance in the mini cluster
    * @param dnIndex datanode index
    * @param dnIndex datanode index
-   * @param dirIndex directory index (0 or 1).
+   * @param dirIndex directory index.
    * @return
    * @return
    */
    */
   private static String getStorageDirPath(int dnIndex, int dirIndex) {
   private static String getStorageDirPath(int dnIndex, int dirIndex) {

+ 135 - 36
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java

@@ -71,7 +71,15 @@ import org.mockito.invocation.InvocationOnMock;
 /**
 /**
  * This test simulates a variety of situations when blocks are being
  * This test simulates a variety of situations when blocks are being
  * intentionally corrupted, unexpectedly modified, and so on before a block
  * intentionally corrupted, unexpectedly modified, and so on before a block
- * report is happening
+ * report is happening.
+ *
+ * For each test case it runs two variations:
+ *  #1 - For a given DN, the first variation sends block reports for all
+ *       storages in a single call to the NN.
+ *  #2 - For a given DN, the second variation sends block reports for each
+ *       storage in a separate call.
+ *
+ * The behavior should be the same in either variation.
  */
  */
 public class TestBlockReport {
 public class TestBlockReport {
   public static final Log LOG = LogFactory.getLog(TestBlockReport.class);
   public static final Log LOG = LogFactory.getLog(TestBlockReport.class);
@@ -157,6 +165,113 @@ public class TestBlockReport {
     return reports;
     return reports;
   }
   }
 
 
+  /**
+   * Utility routine to send block reports to the NN, either in a single call
+   * or reporting one storage per call.
+   *
+   * @param dnR
+   * @param poolId
+   * @param reports
+   * @param needtoSplit
+   * @throws IOException
+   */
+  private void sendBlockReports(DatanodeRegistration dnR, String poolId,
+      StorageBlockReport[] reports, boolean needtoSplit) throws IOException {
+    if (!needtoSplit) {
+      LOG.info("Sending combined block reports for " + dnR);
+      cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    } else {
+      for (StorageBlockReport report : reports) {
+        LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
+        StorageBlockReport[] singletonReport = { report };
+        cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
+      }
+    }
+  }
+
+  /**
+   * Test variations blockReport_01 through blockReport_09 with combined
+   * and split block reports.
+   */
+  @Test
+  public void blockReportCombined_01() throws IOException {
+    blockReport_01(false);
+  }
+
+  @Test
+  public void blockReportSplit_01() throws IOException {
+    blockReport_01(true);
+  }
+
+  @Test
+  public void blockReportCombined_02() throws IOException {
+    blockReport_02(false);
+  }
+
+  @Test
+  public void blockReportSplit_02() throws IOException {
+    blockReport_02(true);
+  }
+
+  @Test
+  public void blockReportCombined_03() throws IOException {
+    blockReport_03(false);
+  }
+
+  @Test
+  public void blockReportSplit_03() throws IOException {
+    blockReport_03(true);
+  }
+
+  @Test
+  public void blockReportCombined_04() throws IOException {
+    blockReport_04(false);
+  }
+
+  @Test
+  public void blockReportSplit_04() throws IOException {
+    blockReport_04(true);
+  }
+
+  @Test
+  public void blockReportCombined_06() throws Exception {
+    blockReport_06(false);
+  }
+
+  @Test
+  public void blockReportSplit_06() throws Exception {
+    blockReport_06(true);
+  }
+
+  @Test
+  public void blockReportCombined_07() throws Exception {
+    blockReport_07(false);
+  }
+
+  @Test
+  public void blockReportSplit_07() throws Exception {
+    blockReport_07(true);
+  }
+
+  @Test
+  public void blockReportCombined_08() throws Exception {
+    blockReport_08(false);
+  }
+
+  @Test
+  public void blockReportSplit_08() throws Exception {
+    blockReport_08(true);
+  }
+
+  @Test
+  public void blockReportCombined_09() throws Exception {
+    blockReport_09(false);
+  }
+
+  @Test
+  public void blockReportSplit_09() throws Exception {
+    blockReport_09(true);
+  }
   /**
   /**
    * Test write a file, verifies and closes it. Then the length of the blocks
    * Test write a file, verifies and closes it. Then the length of the blocks
    * are messed up and BlockReport is forced.
    * are messed up and BlockReport is forced.
@@ -164,8 +279,7 @@ public class TestBlockReport {
    *
    *
    * @throws java.io.IOException on an error
    * @throws java.io.IOException on an error
    */
    */
-  @Test
-  public void blockReport_01() throws IOException {
+  private void blockReport_01(boolean splitBlockReports) throws IOException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
 
 
@@ -198,7 +312,7 @@ public class TestBlockReport {
     String poolId = cluster.getNamesystem().getBlockPoolId();
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
 
 
     List<LocatedBlock> blocksAfterReport =
     List<LocatedBlock> blocksAfterReport =
       DFSTestUtil.getAllBlocks(fs.open(filePath));
       DFSTestUtil.getAllBlocks(fs.open(filePath));
@@ -224,8 +338,7 @@ public class TestBlockReport {
    *
    *
    * @throws IOException in case of errors
    * @throws IOException in case of errors
    */
    */
-  @Test
-  public void blockReport_02() throws IOException {
+  private void blockReport_02(boolean splitBlockReports) throws IOException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     LOG.info("Running test " + METHOD_NAME);
     LOG.info("Running test " + METHOD_NAME);
 
 
@@ -280,7 +393,7 @@ public class TestBlockReport {
     String poolId = cluster.getNamesystem().getBlockPoolId();
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
     DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
     StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
 
 
     BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
     BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
         .getBlockManager());
         .getBlockManager());
@@ -301,8 +414,7 @@ public class TestBlockReport {
    *
    *
    * @throws IOException in case of an error
    * @throws IOException in case of an error
    */
    */
-  @Test
-  public void blockReport_03() throws IOException {
+  private void blockReport_03(boolean splitBlockReports) throws IOException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
     ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
@@ -312,11 +424,7 @@ public class TestBlockReport {
     String poolId = cluster.getNamesystem().getBlockPoolId();
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
-    DatanodeCommand dnCmd =
-      cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Got the command: " + dnCmd);
-    }
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
     printStats();
     printStats();
 
 
     assertThat("Wrong number of corrupt blocks",
     assertThat("Wrong number of corrupt blocks",
@@ -333,8 +441,7 @@ public class TestBlockReport {
    *
    *
    * @throws IOException in case of an error
    * @throws IOException in case of an error
    */
    */
-  @Test
-  public void blockReport_04() throws IOException {
+  private void blockReport_04(boolean splitBlockReports) throws IOException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     DFSTestUtil.createFile(fs, filePath,
     DFSTestUtil.createFile(fs, filePath,
@@ -352,11 +459,7 @@ public class TestBlockReport {
 
 
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
-    DatanodeCommand dnCmd =
-        cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Got the command: " + dnCmd);
-    }
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
     printStats();
     printStats();
 
 
     assertThat("Wrong number of corrupt blocks",
     assertThat("Wrong number of corrupt blocks",
@@ -373,8 +476,7 @@ public class TestBlockReport {
    *
    *
    * @throws IOException in case of an error
    * @throws IOException in case of an error
    */
    */
-  @Test
-  public void blockReport_06() throws Exception {
+  private void blockReport_06(boolean splitBlockReports) throws Exception {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     final int DN_N1 = DN_N0 + 1;
     final int DN_N1 = DN_N0 + 1;
@@ -387,7 +489,7 @@ public class TestBlockReport {
     String poolId = cluster.getNamesystem().getBlockPoolId();
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
     printStats();
     printStats();
     assertEquals("Wrong number of PendingReplication Blocks",
     assertEquals("Wrong number of PendingReplication Blocks",
       0, cluster.getNamesystem().getUnderReplicatedBlocks());
       0, cluster.getNamesystem().getUnderReplicatedBlocks());
@@ -406,8 +508,7 @@ public class TestBlockReport {
    *
    *
    * @throws IOException in case of an error
    * @throws IOException in case of an error
    */
    */
-  @Test
-  public void blockReport_07() throws Exception {
+  private void blockReport_07(boolean splitBlockReports) throws Exception {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     final int DN_N1 = DN_N0 + 1;
     final int DN_N1 = DN_N0 + 1;
@@ -421,7 +522,7 @@ public class TestBlockReport {
     String poolId = cluster.getNamesystem().getBlockPoolId();
     String poolId = cluster.getNamesystem().getBlockPoolId();
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
     StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
     printStats();
     printStats();
 
 
     assertThat("Wrong number of corrupt blocks",
     assertThat("Wrong number of corrupt blocks",
@@ -432,7 +533,7 @@ public class TestBlockReport {
                cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
                cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
 
 
     reports = getBlockReports(dn, poolId, true, true);
     reports = getBlockReports(dn, poolId, true, true);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    sendBlockReports(dnR, poolId, reports, splitBlockReports);
     printStats();
     printStats();
 
 
     assertThat("Wrong number of corrupt blocks",
     assertThat("Wrong number of corrupt blocks",
@@ -458,8 +559,7 @@ public class TestBlockReport {
    *
    *
    * @throws IOException in case of an error
    * @throws IOException in case of an error
    */
    */
-  @Test
-  public void blockReport_08() throws IOException {
+  private void blockReport_08(boolean splitBlockReports) throws IOException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     final int DN_N1 = DN_N0 + 1;
     final int DN_N1 = DN_N0 + 1;
@@ -483,8 +583,8 @@ public class TestBlockReport {
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       String poolId = cluster.getNamesystem().getBlockPoolId();
       String poolId = cluster.getNamesystem().getBlockPoolId();
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-      StorageBlockReport[] report = getBlockReports(dn, poolId, false, false);
-      cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
+      StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
+      sendBlockReports(dnR, poolId, reports, splitBlockReports);
       printStats();
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
       assertEquals("Wrong number of PendingReplication blocks",
         blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
         blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@@ -500,8 +600,7 @@ public class TestBlockReport {
   // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
   // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
   // replica block. Expect the same behaviour: NN should simply ignore this
   // replica block. Expect the same behaviour: NN should simply ignore this
   // block
   // block
-  @Test
-  public void blockReport_09() throws IOException {
+  private void blockReport_09(boolean splitBlockReports) throws IOException {
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     Path filePath = new Path("/" + METHOD_NAME + ".dat");
     final int DN_N1 = DN_N0 + 1;
     final int DN_N1 = DN_N0 + 1;
@@ -526,8 +625,8 @@ public class TestBlockReport {
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       DataNode dn = cluster.getDataNodes().get(DN_N1);
       String poolId = cluster.getNamesystem().getBlockPoolId();
       String poolId = cluster.getNamesystem().getBlockPoolId();
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
       DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-      StorageBlockReport[] report = getBlockReports(dn, poolId, true, true);
-      cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
+      StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
+      sendBlockReports(dnR, poolId, reports, splitBlockReports);
       printStats();
       printStats();
       assertEquals("Wrong number of PendingReplication blocks",
       assertEquals("Wrong number of PendingReplication blocks",
         2, cluster.getNamesystem().getPendingReplicationBlocks());
         2, cluster.getNamesystem().getPendingReplicationBlocks());

+ 213 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java

@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test verifies that incremental block reports from a single DataNode are
+ * correctly handled by NN. Tests the following variations:
+ *  #1 - Incremental BRs from all storages combined in a single call.
+ *  #2 - Incremental BRs from separate storages sent in separate calls.
+ *
+ *  We also verify that the DataNode is not splitting the reports (it may do so
+ *  in the future).
+ */
+public class TestIncrementalBrVariations {
+  public static final Log LOG = LogFactory.getLog(TestIncrementalBrVariations.class);
+
+  private static short NUM_DATANODES = 1;
+  static final int BLOCK_SIZE = 1024;
+  static final int NUM_BLOCKS = 10;
+  private static final long seed = 0xFACEFEEDL;
+  private static final String NN_METRICS = "NameNodeActivity";
+
+
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private DFSClient client;
+  private static Configuration conf;
+
+  static {
+    ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) BlockManager.blockLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) TestIncrementalBrVariations.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  @Before
+  public void startUpCluster() throws IOException {
+    conf = new Configuration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    fs = cluster.getFileSystem();
+    client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()),
+                           cluster.getConfiguration(0));
+  }
+
+  @After
+  public void shutDownCluster() throws IOException {
+    client.close();
+    fs.close();
+    cluster.shutdownDataNodes();
+    cluster.shutdown();
+  }
+
+  /**
+   * Incremental BRs from all storages combined in a single message.
+   */
+  @Test
+  public void testCombinedIncrementalBlockReport() throws IOException {
+    verifyIncrementalBlockReports(false);
+  }
+
+  /**
+   * One incremental BR per storage.
+   */
+  @Test
+  public void testSplitIncrementalBlockReport() throws IOException {
+    verifyIncrementalBlockReports(true);
+  }
+
+  private LocatedBlocks createFileGetBlocks(String filenamePrefix) throws IOException {
+    Path filePath = new Path("/" + filenamePrefix + ".dat");
+
+    // Write out a file with a few blocks, get block locations.
+    DFSTestUtil.createFile(fs, filePath, BLOCK_SIZE, BLOCK_SIZE * NUM_BLOCKS,
+                           BLOCK_SIZE, NUM_DATANODES, seed);
+
+    // Get the block list for the file with the block locations.
+    LocatedBlocks blocks = client.getLocatedBlocks(
+        filePath.toString(), 0, BLOCK_SIZE * NUM_BLOCKS);
+    assertThat(cluster.getNamesystem().getUnderReplicatedBlocks(), is(0L));
+    return blocks;
+  }
+
+  public void verifyIncrementalBlockReports(boolean splitReports) throws IOException {
+    // Get the block list for the file with the block locations.
+    LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
+
+    // A blocks belong to the same file, hence same BP
+    DataNode dn = cluster.getDataNodes().get(0);
+    String poolId = cluster.getNamesystem().getBlockPoolId();
+    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
+
+    // We will send 'fake' incremental block reports to the NN that look
+    // like they originated from DN 0.
+    StorageReceivedDeletedBlocks reports[] =
+        new StorageReceivedDeletedBlocks[dn.getFSDataset().getVolumes().size()];
+
+    // Lie to the NN that one block on each storage has been deleted.
+    for (int i = 0; i < reports.length; ++i) {
+      FsVolumeSpi volume = dn.getFSDataset().getVolumes().get(i);
+
+      boolean foundBlockOnStorage = false;
+      ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1];
+
+      // Find the first block on this storage and mark it as deleted for the
+      // report.
+      for (LocatedBlock block : blocks.getLocatedBlocks()) {
+        if (block.getStorageIDs()[0].equals(volume.getStorageID())) {
+          rdbi[0] = new ReceivedDeletedBlockInfo(block.getBlock().getLocalBlock(),
+              ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
+          foundBlockOnStorage = true;
+          break;
+        }
+      }
+
+      assertTrue(foundBlockOnStorage);
+      reports[i] = new StorageReceivedDeletedBlocks(volume.getStorageID(), rdbi);
+
+      if (splitReports) {
+        // If we are splitting reports then send the report for this storage now.
+        StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
+        cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, singletonReport);
+      }
+    }
+
+    if (!splitReports) {
+      // Send a combined report.
+      cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, reports);
+    }
+
+    // Make sure that the deleted block from each storage was picked up
+    // by the NameNode.
+    assertThat(cluster.getNamesystem().getMissingBlocksCount(), is((long) reports.length));
+  }
+
+  /**
+   * Verify that the DataNode sends a single incremental block report for all
+   * storages.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=60000)
+  public void testDataNodeDoesNotSplitReports()
+      throws IOException, InterruptedException {
+    LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
+    assertThat(cluster.getDataNodes().size(), is(1));
+    DataNode dn = cluster.getDataNodes().get(0);
+
+    // Remove all blocks from the DataNode.
+    for (LocatedBlock block : blocks.getLocatedBlocks()) {
+      dn.notifyNamenodeDeletedBlock(
+          block.getBlock(), block.getStorageIDs()[0]);
+    }
+
+    LOG.info("Triggering report after deleting blocks");
+    long ops = getLongCounter("BlockReceivedAndDeletedOps", getMetrics(NN_METRICS));
+
+    // Trigger a report to the NameNode and give it a few seconds.
+    DataNodeTestUtils.triggerBlockReport(dn);
+    Thread.sleep(5000);
+
+    // Ensure that NameNodeRpcServer.blockReceivedAndDeletes is invoked
+    // exactly once after we triggered the report.
+    assertCounter("BlockReceivedAndDeletedOps", ops+1, getMetrics(NN_METRICS));
+  }
+}