Browse Source

HDFS-6094. The same block can be counted twice towards safe mode threshold. (Arpit Agarwal)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1578478 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 years ago
parent
commit
809e8bf5b7
14 changed files with 162 additions and 54 deletions
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
  3. 6 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
  4. 15 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  5. 13 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
  6. 7 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
  7. 15 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  8. 4 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  9. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  11. 16 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java
  12. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
  13. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  14. 62 14
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -623,6 +623,9 @@ Release 2.4.0 - UNRELEASED
     HDFS-6102. Lower the default maximum items per directory to fix PB fsimage
     loading. (wang)
 
+    HDFS-6094. The same block can be counted twice towards safe mode
+    threshold. (Arpit Agarwal)
+
   BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
 
     HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java

@@ -252,7 +252,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     for (StorageReceivedDeletedBlocks storageBlock : receivedAndDeletedBlocks) {
       StorageReceivedDeletedBlocksProto.Builder repBuilder = 
           StorageReceivedDeletedBlocksProto.newBuilder();
-      repBuilder.setStorageUuid(storageBlock.getStorageID());
+      repBuilder.setStorageUuid(storageBlock.getStorage().getStorageID());  // Set for wire compatibility.
+      repBuilder.setStorage(PBHelper.convert(storageBlock.getStorage()));
       for (ReceivedDeletedBlockInfo rdBlock : storageBlock.getBlocks()) {
         repBuilder.addBlocks(PBHelper.convert(rdBlock));
       }

+ 6 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java

@@ -198,7 +198,12 @@ public class DatanodeProtocolServerSideTranslatorPB implements
       for (int j = 0; j < list.size(); j++) {
         rdBlocks[j] = PBHelper.convert(list.get(j));
       }
-      info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks);
+      if (sBlock.hasStorage()) {
+        info[i] = new StorageReceivedDeletedBlocks(
+            PBHelper.convert(sBlock.getStorage()), rdBlocks);
+      } else {
+        info[i] = new StorageReceivedDeletedBlocks(sBlock.getStorageUuid(), rdBlocks);
+      }
     }
     try {
       impl.blockReceivedAndDeleted(PBHelper.convert(request.getRegistration()),

+ 15 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -1668,9 +1668,6 @@ public class BlockManager {
       if (storageInfo == null) {
         // We handle this for backwards compatibility.
         storageInfo = node.updateStorage(storage);
-        LOG.warn("Unknown storageId " + storage.getStorageID() +
-                    ", updating storageMap. This indicates a buggy " +
-                    "DataNode that isn't heartbeating correctly.");
       }
       if (namesystem.isInStartupSafeMode()
           && storageInfo.getBlockReportCount() > 0) {
@@ -2277,7 +2274,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
         numLiveReplicas >= minReplication) {
       storedBlock = completeBlock(bc, storedBlock, false);
-    } else if (storedBlock.isComplete()) {
+    } else if (storedBlock.isComplete() && added) {
       // check whether safe replication is reached for the block
       // only complete blocks are counted towards that
       // Is no-op if not in safe mode.
@@ -2858,8 +2855,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
    * This method must be called with FSNamesystem lock held.
    */
   public void processIncrementalBlockReport(final DatanodeID nodeID,
-      final String poolId, final StorageReceivedDeletedBlocks srdb)
-      throws IOException {
+      final StorageReceivedDeletedBlocks srdb) throws IOException {
     assert namesystem.hasWriteLock();
     int received = 0;
     int deleted = 0;
@@ -2874,6 +2870,15 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
           "Got incremental block report from unregistered or dead node");
     }
 
+    if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) {
+      // The DataNode is reporting an unknown storage. Usually the NN learns
+      // about new storages from heartbeats but during NN restart we may
+      // receive a block report or incremental report before the heartbeat.
+      // We must handle this for protocol compatibility. This issue was
+      // uncovered by HDFS-6904.
+      node.updateStorage(srdb.getStorage());
+    }
+
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
       switch (rdbi.getStatus()) {
       case DELETED_BLOCK:
@@ -2881,13 +2886,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
         deleted++;
         break;
       case RECEIVED_BLOCK:
-        addBlock(node, srdb.getStorageID(), rdbi.getBlock(), rdbi.getDelHints());
+        addBlock(node, srdb.getStorage().getStorageID(),
+            rdbi.getBlock(), rdbi.getDelHints());
         received++;
         break;
       case RECEIVING_BLOCK:
         receiving++;
-        processAndHandleReportedBlock(node, srdb.getStorageID(), rdbi.getBlock(),
-            ReplicaState.RBW, null);
+        processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(),
+            rdbi.getBlock(), ReplicaState.RBW, null);
         break;
       default:
         String msg = 

+ 13 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -245,7 +246,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return false;
   }
 
-  DatanodeStorageInfo getStorageInfo(String storageID) {
+  @VisibleForTesting
+  public DatanodeStorageInfo getStorageInfo(String storageID) {
     synchronized (storageMap) {
       return storageMap.get(storageID);
     }
@@ -366,12 +368,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
     for (StorageReport report : reports) {
-      DatanodeStorageInfo storage = storageMap.get(report.getStorage().getStorageID());
-      if (storage == null) {
-        // This is seen during cluster initialization when the heartbeat
-        // is received before the initial block reports from each storage.
-        storage = updateStorage(report.getStorage());
-      }
+      DatanodeStorageInfo storage = updateStorage(report.getStorage());
       storage.receivedHeartbeat(report);
       totalCapacity += report.getCapacity();
       totalRemaining += report.getRemaining();
@@ -669,6 +666,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
                  " for DN " + getXferAddr());
         storage = new DatanodeStorageInfo(this, s);
         storageMap.put(s.getStorageID(), storage);
+      } else if (storage.getState() != s.getState() ||
+                 storage.getStorageType() != s.getStorageType()) {
+        // For backwards compatibility, make sure that the type and
+        // state are updated. Some reports from older datanodes do
+        // not include these fields so we may have assumed defaults.
+        // This check can be removed in the next major release after
+        // 2.4.
+        storage.updateFromStorage(s);
+        storageMap.put(storage.getStorageID(), storage);
       }
       return storage;
     }

+ 7 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java

@@ -71,6 +71,11 @@ public class DatanodeStorageInfo {
     return storageTypes;
   }
 
+  public void updateFromStorage(DatanodeStorage storage) {
+    state = storage.getState();
+    storageType = storage.getStorageType();
+  }
+
   /**
    * Iterates over the list of blocks belonging to the data-node.
    */
@@ -98,8 +103,8 @@ public class DatanodeStorageInfo {
 
   private final DatanodeDescriptor dn;
   private final String storageID;
-  private final StorageType storageType;
-  private final State state;
+  private StorageType storageType;
+  private State state;
 
   private long capacity;
   private long dfsUsed;

+ 15 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -98,7 +98,7 @@ class BPServiceActor implements Runnable {
    * keyed by block ID, contains the pending changes which have yet to be
    * reported to the NN. Access should be synchronized on this object.
    */
-  private final Map<String, PerStoragePendingIncrementalBR>
+  private final Map<DatanodeStorage, PerStoragePendingIncrementalBR>
       pendingIncrementalBRperStorage = Maps.newHashMap();
 
   // IBR = Incremental Block Report. If this flag is set then an IBR will be
@@ -270,15 +270,15 @@ class BPServiceActor implements Runnable {
     ArrayList<StorageReceivedDeletedBlocks> reports =
         new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size());
     synchronized (pendingIncrementalBRperStorage) {
-      for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+      for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
            pendingIncrementalBRperStorage.entrySet()) {
-        final String storageUuid = entry.getKey();
+        final DatanodeStorage storage = entry.getKey();
         final PerStoragePendingIncrementalBR perStorageMap = entry.getValue();
 
         if (perStorageMap.getBlockInfoCount() > 0) {
           // Send newly-received and deleted blockids to namenode
           ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos();
-          reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi));
+          reports.add(new StorageReceivedDeletedBlocks(storage, rdbi));
         }
       }
       sendImmediateIBR = false;
@@ -304,7 +304,7 @@ class BPServiceActor implements Runnable {
             // blocks back onto our queue, but only in the case where we
             // didn't put something newer in the meantime.
             PerStoragePendingIncrementalBR perStorageMap =
-                pendingIncrementalBRperStorage.get(report.getStorageID());
+                pendingIncrementalBRperStorage.get(report.getStorage());
             perStorageMap.putMissingBlockInfos(report.getBlocks());
             sendImmediateIBR = true;
           }
@@ -319,16 +319,16 @@ class BPServiceActor implements Runnable {
    * @return
    */
   private PerStoragePendingIncrementalBR getIncrementalBRMapForStorage(
-      String storageUuid) {
+      DatanodeStorage storage) {
     PerStoragePendingIncrementalBR mapForStorage =
-        pendingIncrementalBRperStorage.get(storageUuid);
+        pendingIncrementalBRperStorage.get(storage);
 
     if (mapForStorage == null) {
       // This is the first time we are adding incremental BR state for
       // this storage so create a new map. This is required once per
       // storage, per service actor.
       mapForStorage = new PerStoragePendingIncrementalBR();
-      pendingIncrementalBRperStorage.put(storageUuid, mapForStorage);
+      pendingIncrementalBRperStorage.put(storage, mapForStorage);
     }
 
     return mapForStorage;
@@ -343,16 +343,16 @@ class BPServiceActor implements Runnable {
    * @param storageUuid
    */
   void addPendingReplicationBlockInfo(ReceivedDeletedBlockInfo bInfo,
-      String storageUuid) {
+      DatanodeStorage storage) {
     // Make sure another entry for the same block is first removed.
     // There may only be one such entry.
-    for (Map.Entry<String, PerStoragePendingIncrementalBR> entry :
+    for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry :
           pendingIncrementalBRperStorage.entrySet()) {
       if (entry.getValue().removeBlockInfo(bInfo)) {
         break;
       }
     }
-    getIncrementalBRMapForStorage(storageUuid).putBlockInfo(bInfo);
+    getIncrementalBRMapForStorage(storage).putBlockInfo(bInfo);
   }
 
   /*
@@ -363,7 +363,8 @@ class BPServiceActor implements Runnable {
   void notifyNamenodeBlockImmediately(
       ReceivedDeletedBlockInfo bInfo, String storageUuid) {
     synchronized (pendingIncrementalBRperStorage) {
-      addPendingReplicationBlockInfo(bInfo, storageUuid);
+      addPendingReplicationBlockInfo(
+          bInfo, dn.getFSDataset().getStorage(storageUuid));
       sendImmediateIBR = true;
       pendingIncrementalBRperStorage.notifyAll();
     }
@@ -372,7 +373,8 @@ class BPServiceActor implements Runnable {
   void notifyNamenodeDeletedBlock(
       ReceivedDeletedBlockInfo bInfo, String storageUuid) {
     synchronized (pendingIncrementalBRperStorage) {
-      addPendingReplicationBlockInfo(bInfo, storageUuid);
+      addPendingReplicationBlockInfo(
+          bInfo, dn.getFSDataset().getStorage(storageUuid));
     }
   }
 

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -79,7 +79,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
 
   /**
    * Create rolling logs.
-   * 
+   *
    * @param prefix the prefix of the log names.
    * @return rolling logs
    */
@@ -89,6 +89,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
+  /** @return a storage with the given storage ID */
+  public DatanodeStorage getStorage(final String storageUuid);
+
   /** @return one or more storage reports for attached volumes. */
   public StorageReport[] getStorageReports(String bpid)
       throws IOException;

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -75,6 +75,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return volumes.volumes;
   }
 
+  @Override
+  public DatanodeStorage getStorage(final String storageUuid) {
+    return storageMap.get(storageUuid);
+  }
+
   @Override // FsDatasetSpi
   public StorageReport[] getStorageReports(String bpid)
       throws IOException {
@@ -157,6 +162,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   final DataNode datanode;
   final DataStorage dataStorage;
   final FsVolumeList volumes;
+  final Map<String, DatanodeStorage> storageMap;
   final FsDatasetAsyncDiskService asyncDiskService;
   final FsDatasetCache cacheManager;
   private final int validVolsRequired;
@@ -198,6 +204,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + ", volume failures tolerated: " + volFailuresTolerated);
     }
 
+    storageMap = new HashMap<String, DatanodeStorage>();
     final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
         storage.getNumStorageDirs());
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
@@ -207,6 +214,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
           storageType));
       LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+      storageMap.put(sd.getStorageUuid(),
+          new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
     }
     volumeMap = new ReplicaMap(this);
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -5402,7 +5402,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       throws IOException {
     writeLock();
     try {
-      blockManager.processIncrementalBlockReport(nodeID, poolId, srdb);
+      blockManager.processIncrementalBlockReport(nodeID, srdb);
     } finally {
       writeUnlock();
     }

+ 16 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageReceivedDeletedBlocks.java

@@ -23,20 +23,32 @@ package org.apache.hadoop.hdfs.server.protocol;
  * storage.
  */
 public class StorageReceivedDeletedBlocks {
-  private final String storageID;
+  DatanodeStorage storage;
   private final ReceivedDeletedBlockInfo[] blocks;
-  
+
+  @Deprecated
   public String getStorageID() {
-    return storageID;
+    return storage.getStorageID();
+  }
+
+  public DatanodeStorage getStorage() {
+    return storage;
   }
 
   public ReceivedDeletedBlockInfo[] getBlocks() {
     return blocks;
   }
 
+  @Deprecated
   public StorageReceivedDeletedBlocks(final String storageID,
       final ReceivedDeletedBlockInfo[] blocks) {
-    this.storageID = storageID;
+    this.storage = new DatanodeStorage(storageID);
+    this.blocks = blocks;
+  }
+
+  public StorageReceivedDeletedBlocks(final DatanodeStorage storage,
+      final ReceivedDeletedBlockInfo[] blocks) {
+    this.storage = storage;
     this.blocks = blocks;
   }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto

@@ -289,8 +289,9 @@ message ReceivedDeletedBlockInfoProto {
  * List of blocks received and deleted for a storage.
  */
 message StorageReceivedDeletedBlocksProto {
-  required string storageUuid = 1;
+  required string storageUuid = 1 [ deprecated = true ];
   repeated ReceivedDeletedBlockInfoProto blocks = 2;
+  optional DatanodeStorageProto storage = 3;   // supersedes storageUuid.
 }
 
 /**

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -1084,6 +1084,13 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public DatanodeStorage getStorage(final String storageUuid) {
+    return storageUuid.equals(storage.getStorageUuid()) ?
+        storage.dnStorage :
+        null;
+  }
+
   @Override
   public StorageReport[] getStorageReports(String bpid) {
     return new StorageReport[] {storage.getStorageReport(bpid)};

+ 62 - 14
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java

@@ -21,11 +21,11 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,10 +38,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -53,6 +55,7 @@ import org.junit.Test;
  * correctly handled by NN. Tests the following variations:
  *  #1 - Incremental BRs from all storages combined in a single call.
  *  #2 - Incremental BRs from separate storages sent in separate calls.
+ *  #3 - Incremental BR from an unknown storage should be rejected.
  *
  *  We also verify that the DataNode is not splitting the reports (it may do so
  *  in the future).
@@ -71,6 +74,9 @@ public class TestIncrementalBrVariations {
   private DistributedFileSystem fs;
   private DFSClient client;
   private static Configuration conf;
+  private String poolId;
+  private DataNode dn0;           // DataNode at index0 in the MiniDFSCluster
+  private DatanodeRegistration dn0Reg;  // DataNodeRegistration for dn0
 
   static {
     ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
@@ -88,6 +94,9 @@ public class TestIncrementalBrVariations {
     fs = cluster.getFileSystem();
     client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()),
                            cluster.getConfiguration(0));
+    dn0 = cluster.getDataNodes().get(0);
+    poolId = cluster.getNamesystem().getBlockPoolId();
+    dn0Reg = dn0.getDNRegistrationForBP(poolId);
   }
 
   @After
@@ -132,19 +141,14 @@ public class TestIncrementalBrVariations {
     // Get the block list for the file with the block locations.
     LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
 
-    // A blocks belong to the same file, hence same BP
-    DataNode dn = cluster.getDataNodes().get(0);
-    String poolId = cluster.getNamesystem().getBlockPoolId();
-    DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
-
     // We will send 'fake' incremental block reports to the NN that look
     // like they originated from DN 0.
     StorageReceivedDeletedBlocks reports[] =
-        new StorageReceivedDeletedBlocks[dn.getFSDataset().getVolumes().size()];
+        new StorageReceivedDeletedBlocks[dn0.getFSDataset().getVolumes().size()];
 
     // Lie to the NN that one block on each storage has been deleted.
     for (int i = 0; i < reports.length; ++i) {
-      FsVolumeSpi volume = dn.getFSDataset().getVolumes().get(i);
+      FsVolumeSpi volume = dn0.getFSDataset().getVolumes().get(i);
 
       boolean foundBlockOnStorage = false;
       ReceivedDeletedBlockInfo rdbi[] = new ReceivedDeletedBlockInfo[1];
@@ -166,13 +170,14 @@ public class TestIncrementalBrVariations {
       if (splitReports) {
         // If we are splitting reports then send the report for this storage now.
         StorageReceivedDeletedBlocks singletonReport[] = { reports[i] };
-        cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, singletonReport);
+        cluster.getNameNodeRpc().blockReceivedAndDeleted(
+            dn0Reg, poolId, singletonReport);
       }
     }
 
     if (!splitReports) {
       // Send a combined report.
-      cluster.getNameNodeRpc().blockReceivedAndDeleted(dnR, poolId, reports);
+      cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
     }
 
     // Make sure that the deleted block from each storage was picked up
@@ -191,11 +196,10 @@ public class TestIncrementalBrVariations {
       throws IOException, InterruptedException {
     LocatedBlocks blocks = createFileGetBlocks(GenericTestUtils.getMethodName());
     assertThat(cluster.getDataNodes().size(), is(1));
-    DataNode dn = cluster.getDataNodes().get(0);
 
     // Remove all blocks from the DataNode.
     for (LocatedBlock block : blocks.getLocatedBlocks()) {
-      dn.notifyNamenodeDeletedBlock(
+      dn0.notifyNamenodeDeletedBlock(
           block.getBlock(), block.getStorageIDs()[0]);
     }
 
@@ -203,11 +207,55 @@ public class TestIncrementalBrVariations {
     long ops = getLongCounter("BlockReceivedAndDeletedOps", getMetrics(NN_METRICS));
 
     // Trigger a report to the NameNode and give it a few seconds.
-    DataNodeTestUtils.triggerBlockReport(dn);
+    DataNodeTestUtils.triggerBlockReport(dn0);
     Thread.sleep(5000);
 
     // Ensure that NameNodeRpcServer.blockReceivedAndDeletes is invoked
     // exactly once after we triggered the report.
     assertCounter("BlockReceivedAndDeletedOps", ops+1, getMetrics(NN_METRICS));
   }
+
+  private static Block getDummyBlock() {
+    return new Block(10000000L, 100L, 1048576L);
+  }
+
+  private static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+      Block block, DatanodeStorage storage) {
+    ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
+    receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, BlockStatus.RECEIVED_BLOCK, null);
+    StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
+    reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+    return reports;
+  }
+
+  /**
+   * Verify that the NameNode can learn about new storages from incremental
+   * block reports.
+   * This tests the fix for the error condition seen in HDFS-6904.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test (timeout=60000)
+  public void testNnLearnsNewStorages()
+      throws IOException, InterruptedException {
+
+    // Generate a report for a fake block on a fake storage.
+    final String newStorageUuid = UUID.randomUUID().toString();
+    final DatanodeStorage newStorage = new DatanodeStorage(newStorageUuid);
+    StorageReceivedDeletedBlocks[] reports = makeReportForReceivedBlock(
+        getDummyBlock(), newStorage);
+
+    // Send the report to the NN.
+    cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
+
+    // Make sure that the NN has learned of the new storage.
+    DatanodeStorageInfo storageInfo = cluster.getNameNode()
+                                             .getNamesystem()
+                                             .getBlockManager()
+                                             .getDatanodeManager()
+                                             .getDatanode(dn0.getDatanodeId())
+                                             .getStorageInfo(newStorageUuid);
+    assertNotNull(storageInfo);
+  }
 }