Browse Source

HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.

Kihwal Lee 10 năm trước cách đây
mục cha
commit
fc1031af74
13 tập tin đã thay đổi với 430 bổ sung79 xóa
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 37 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
  3. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  4. 200 68
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
  5. 2 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  6. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  7. 5 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
  8. 3 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
  9. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
  10. 152 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
  11. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
  12. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
  13. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -339,6 +339,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-7713. Implement mkdirs in the HDFS Web UI. (Ravi Prakash via wheat9)
 
+    HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
+    a lot of time if disks are busy (Rushabh S Shah via kihwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 37 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java

@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,6 +35,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -108,6 +111,40 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
     return builder.build();
   }
 
+  public static BlockListAsLongs readFrom(InputStream is) throws IOException {
+    CodedInputStream cis = CodedInputStream.newInstance(is);
+    int numBlocks = -1;
+    ByteString blocksBuf = null;
+    while (!cis.isAtEnd()) {
+      int tag = cis.readTag();
+      int field = WireFormat.getTagFieldNumber(tag);
+      switch(field) {
+        case 0:
+          break;
+        case 1:
+          numBlocks = (int)cis.readInt32();
+          break;
+        case 2:
+          blocksBuf = cis.readBytes();
+          break;
+        default:
+          cis.skipField(tag);
+          break;
+      }
+    }
+    if (numBlocks != -1 && blocksBuf != null) {
+      return decodeBuffer(numBlocks, blocksBuf);
+    }
+    return null;
+  }
+
+  public void writeTo(OutputStream os) throws IOException {
+    CodedOutputStream cos = CodedOutputStream.newInstance(os);
+    cos.writeInt32(1, getNumberOfBlocks());
+    cos.writeBytes(2, getBlocksBuffer());
+    cos.flush();
+  }
+  
   public static Builder builder() {
     return new BlockListAsLongs.Builder();
   }

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -41,8 +41,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMOR
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
@@ -159,6 +157,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -2500,6 +2499,10 @@ public class DataNode extends ReconfigurableBase
     return blockScanner;
   }
 
+  @VisibleForTesting
+  DirectoryScanner getDirectoryScanner() {
+    return directoryScanner;
+  }
 
   public static void secureMain(String args[], SecureResources resources) {
     int errorCode = 0;

+ 200 - 68
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -23,12 +23,12 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.io.Writer;
+import java.util.Iterator;
 import java.util.Scanner;
 
 import org.apache.commons.io.FileUtils;
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -55,6 +57,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.io.Files;
 /**
  * A block pool slice represents a portion of a block pool stored on a volume.  
  * Taken together, all BlockPoolSlices sharing a block pool ID across a 
@@ -77,7 +80,9 @@ class BlockPoolSlice {
   private volatile boolean dfsUsedSaved = false;
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
   private final boolean deleteDuplicateReplicas;
-  
+  private static final String REPLICA_CACHE_FILE = "replicas";
+  private final long replicaCacheExpiry = 5*60*1000;
+
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final DU dfsUsage;
 
@@ -310,11 +315,14 @@ class BlockPoolSlice {
       FsDatasetImpl.LOG.info(
           "Recovered " + numRecovered + " replicas from " + lazypersistDir);
     }
-
-    // add finalized replicas
-    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
-    // add rbw replicas
-    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+    
+    boolean  success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
+    if (!success) {
+      // add finalized replicas
+      addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
+      // add rbw replicas
+      addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+    }
   }
 
   /**
@@ -401,6 +409,75 @@ class BlockPoolSlice {
     FileUtil.fullyDelete(source);
     return numRecovered;
   }
+  
+  private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
+      final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
+      throws IOException {
+    ReplicaInfo newReplica = null;
+    long blockId = block.getBlockId();
+    long genStamp = block.getGenerationStamp();
+    if (isFinalized) {
+      newReplica = new FinalizedReplica(blockId, 
+          block.getNumBytes(), genStamp, volume, DatanodeUtil
+          .idToBlockDir(finalizedDir, blockId));
+    } else {
+      File file = new File(rbwDir, block.getBlockName());
+      boolean loadRwr = true;
+      File restartMeta = new File(file.getParent()  +
+          File.pathSeparator + "." + file.getName() + ".restart");
+      Scanner sc = null;
+      try {
+        sc = new Scanner(restartMeta, "UTF-8");
+        // The restart meta file exists
+        if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+          // It didn't expire. Load the replica as a RBW.
+          // We don't know the expected block length, so just use 0
+          // and don't reserve any more space for writes.
+          newReplica = new ReplicaBeingWritten(blockId,
+              validateIntegrityAndSetLength(file, genStamp), 
+              genStamp, volume, file.getParentFile(), null, 0);
+          loadRwr = false;
+        }
+        sc.close();
+        if (!restartMeta.delete()) {
+          FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
+              restartMeta.getPath());
+        }
+      } catch (FileNotFoundException fnfe) {
+        // nothing to do hereFile dir =
+      } finally {
+        if (sc != null) {
+          sc.close();
+        }
+      }
+      // Restart meta doesn't exist or expired.
+      if (loadRwr) {
+        newReplica = new ReplicaWaitingToBeRecovered(blockId,
+            validateIntegrityAndSetLength(file, genStamp),
+            genStamp, volume, file.getParentFile());
+      }
+    }
+
+    ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+    if (oldReplica == null) {
+      volumeMap.add(bpid, newReplica);
+    } else {
+      // We have multiple replicas of the same block so decide which one
+      // to keep.
+      newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+    }
+
+    // If we are retaining a replica on transient storage make sure
+    // it is in the lazyWriteReplicaMap so it can be persisted
+    // eventually.
+    if (newReplica.getVolume().isTransientStorage()) {
+      lazyWriteReplicaMap.addReplica(bpid, blockId,
+          (FsVolumeImpl) newReplica.getVolume());
+    } else {
+      lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
+    }
+  }
+  
 
   /**
    * Add replicas under the given directory to the volume map
@@ -434,66 +511,9 @@ class BlockPoolSlice {
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
           files, file);
       long blockId = Block.filename2id(file.getName());
-      ReplicaInfo newReplica = null;
-      if (isFinalized) {
-        newReplica = new FinalizedReplica(blockId, 
-            file.length(), genStamp, volume, file.getParentFile());
-      } else {
-
-        boolean loadRwr = true;
-        File restartMeta = new File(file.getParent()  +
-            File.pathSeparator + "." + file.getName() + ".restart");
-        Scanner sc = null;
-        try {
-          sc = new Scanner(restartMeta, "UTF-8");
-          // The restart meta file exists
-          if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
-            // It didn't expire. Load the replica as a RBW.
-            // We don't know the expected block length, so just use 0
-            // and don't reserve any more space for writes.
-            newReplica = new ReplicaBeingWritten(blockId,
-                validateIntegrityAndSetLength(file, genStamp),
-                genStamp, volume, file.getParentFile(), null, 0);
-            loadRwr = false;
-          }
-          sc.close();
-          if (!restartMeta.delete()) {
-            FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
-              restartMeta.getPath());
-          }
-        } catch (FileNotFoundException fnfe) {
-          // nothing to do hereFile dir =
-        } finally {
-          if (sc != null) {
-            sc.close();
-          }
-        }
-        // Restart meta doesn't exist or expired.
-        if (loadRwr) {
-          newReplica = new ReplicaWaitingToBeRecovered(blockId,
-              validateIntegrityAndSetLength(file, genStamp),
-              genStamp, volume, file.getParentFile());
-        }
-      }
-
-      ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
-      if (oldReplica == null) {
-        volumeMap.add(bpid, newReplica);
-      } else {
-        // We have multiple replicas of the same block so decide which one
-        // to keep.
-        newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
-      }
-
-      // If we are retaining a replica on transient storage make sure
-      // it is in the lazyWriteReplicaMap so it can be persisted
-      // eventually.
-      if (newReplica.getVolume().isTransientStorage()) {
-        lazyWriteReplicaMap.addReplica(bpid, blockId,
-                                       (FsVolumeImpl) newReplica.getVolume());
-      } else {
-        lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
-      }
+      Block block = new Block(blockId, file.length(), genStamp); 
+      addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap, 
+          isFinalized);
     }
   }
 
@@ -649,9 +669,121 @@ class BlockPoolSlice {
     return currentDir.getAbsolutePath();
   }
   
-  void shutdown() {
+  void shutdown(BlockListAsLongs blocksListToPersist) {
+    saveReplicas(blocksListToPersist);
     saveDfsUsed();
     dfsUsedSaved = true;
     dfsUsage.shutdown();
   }
+
+  private boolean readReplicasFromCache(ReplicaMap volumeMap,
+      final RamDiskReplicaTracker lazyWriteReplicaMap) {
+    ReplicaMap tmpReplicaMap = new ReplicaMap(this);
+    File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
+    // Check whether the file exists or not.
+    if (!replicaFile.exists()) {
+      LOG.info("Replica Cache file: "+  replicaFile.getPath() + 
+          " doesn't exist ");
+      return false;
+    }
+    long fileLastModifiedTime = replicaFile.lastModified();
+    if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
+      LOG.info("Replica Cache file: " + replicaFile.getPath() + 
+          " has gone stale");
+      // Just to make findbugs happy
+      if (!replicaFile.delete()) {
+        LOG.info("Replica Cache file: " + replicaFile.getPath() + 
+            " cannot be deleted");
+      }
+      return false;
+    }
+    FileInputStream inputStream = null;
+    try {
+      inputStream = new FileInputStream(replicaFile);
+      BlockListAsLongs blocksList =  BlockListAsLongs.readFrom(inputStream);
+      Iterator<BlockReportReplica> iterator = blocksList.iterator();
+      while (iterator.hasNext()) {
+        BlockReportReplica replica = iterator.next();
+        switch (replica.getState()) {
+        case FINALIZED:
+          addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
+          break;
+        case RUR:
+        case RBW:
+        case RWR:
+          addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false);
+          break;
+        default:
+          break;
+        }
+      }
+      inputStream.close();
+      // Now it is safe to add the replica into volumeMap
+      // In case of any exception during parsing this cache file, fall back
+      // to scan all the files on disk.
+      for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) {
+        volumeMap.add(bpid, info);
+      }
+      LOG.info("Successfully read replica from cache file : " 
+          + replicaFile.getPath());
+      return true;
+    } catch (Exception e) {
+      // Any exception we need to revert back to read from disk
+      // Log the error and return false
+      LOG.info("Exception occured while reading the replicas cache file: "
+          + replicaFile.getPath(), e );
+      return false;
+    }
+    finally {
+      if (!replicaFile.delete()) {
+        LOG.info("Failed to delete replica cache file: " +
+            replicaFile.getPath());
+      }
+      // close the inputStream
+      IOUtils.closeStream(inputStream);
+    }
+  } 
+  
+  private void saveReplicas(BlockListAsLongs blocksListToPersist) {
+    if (blocksListToPersist == null || 
+        blocksListToPersist.getNumberOfBlocks()== 0) {
+      return;
+    }
+    File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+    if (tmpFile.exists() && !tmpFile.delete()) {
+      LOG.warn("Failed to delete tmp replicas file in " +
+        tmpFile.getPath());
+      return;
+    }
+    File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+    if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
+      LOG.warn("Failed to delete replicas file in " +
+          replicaCacheFile.getPath());
+      return;
+    }
+    
+    FileOutputStream out = null;
+    try {
+      out = new FileOutputStream(tmpFile);
+      blocksListToPersist.writeTo(out);
+      out.close();
+      // Renaming the tmp file to replicas
+      Files.move(tmpFile, replicaCacheFile);
+    } catch (Exception e) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error, delete both the files (tmp and cache)
+      // and continue.
+      LOG.warn("Failed to write replicas to cache ", e);
+      if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
+        LOG.warn("Failed to delete replicas file: " + 
+            replicaCacheFile.getPath());
+      }
+    } finally {
+      IOUtils.closeStream(out);
+      if (tmpFile.exists() && !tmpFile.delete()) {
+        LOG.warn("Failed to delete tmp file in " +
+            tmpFile.getPath());
+      }
+    }
+  }
 }

+ 2 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -2463,8 +2463,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   @Override
   public synchronized void shutdownBlockPool(String bpid) {
     LOG.info("Removing block pool " + bpid);
+    Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume =  getBlockReports(bpid);
     volumeMap.cleanUpBlockPool(bpid);
-    volumes.removeBlockPool(bpid);
+    volumes.removeBlockPool(bpid, blocksPerVolume);
   }
   
   /**

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -65,7 +66,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.util.Time;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -805,7 +805,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
     for (Entry<String, BlockPoolSlice> entry : set) {
-      entry.getValue().shutdown();
+      entry.getValue().shutdown(null);
     }
   }
 
@@ -815,10 +815,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
     bpSlices.put(bpid, bp);
   }
   
-  void shutdownBlockPool(String bpid) {
+  void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
     BlockPoolSlice bp = bpSlices.get(bpid);
     if (bp != null) {
-      bp.shutdown();
+      bp.shutdown(blocksListsAsLongs);
     }
     bpSlices.remove(bpid);
   }

+ 5 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -35,10 +35,12 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
 
@@ -428,9 +430,10 @@ class FsVolumeList {
         bpid + ": " + totalTimeTaken + "ms");
   }
   
-  void removeBlockPool(String bpid) {
+  void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
+      blocksPerVolume) {
     for (FsVolumeImpl v : volumes.get()) {
-      v.shutdownBlockPool(bpid);
+      v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
     }
   }
 

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java

@@ -304,10 +304,11 @@ public class UpgradeUtilities {
         continue;
       }
 
-      // skip VERSION and dfsUsed file for DataNodes
+      // skip VERSION and dfsUsed and replicas file for DataNodes
       if (nodeType == DATA_NODE &&
           (list[i].getName().equals("VERSION") ||
-              list[i].getName().equals("dfsUsed"))) {
+              list[i].getName().equals("dfsUsed") ||
+              list[i].getName().equals("replicas"))) {
         continue;
       }
 

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java

@@ -218,4 +218,11 @@ public class DataNodeTestUtils {
       }
     }
   }
+  
+  public static void runDirectoryScanner(DataNode dn) throws IOException {
+    DirectoryScanner directoryScanner = dn.getDirectoryScanner();
+    if (directoryScanner != null) {
+      dn.getDirectoryScanner().reconcile();
+    }
+  }
 }

+ 152 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -17,14 +17,25 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
@@ -34,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -501,4 +513,144 @@ public class TestWriteToReplica {
           + "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]);
     }
   }
+  
+  /**
+   * This is a test to check the replica map before and after the datanode 
+   * quick restart (less than 5 minutes)
+   * @throws Exception
+   */
+  @Test
+  public  void testReplicaMapAfterDatanodeRestart() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
+        .build();
+    try {
+      cluster.waitActive();
+      NameNode nn1 = cluster.getNameNode(0);
+      NameNode nn2 = cluster.getNameNode(1);
+      assertNotNull("cannot create nn1", nn1);
+      assertNotNull("cannot create nn2", nn2);
+      
+      // check number of volumes in fsdataset
+      DataNode dn = cluster.getDataNodes().get(0);
+      FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
+          getFSDataset(dn);
+      ReplicaMap replicaMap = dataSet.volumeMap;
+      
+      List<FsVolumeImpl> volumes = dataSet.getVolumes();
+      // number of volumes should be 2 - [data1, data2]
+      assertEquals("number of volumes is wrong", 2, volumes.size());
+      ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
+          cluster.getNamesystem(0).getBlockPoolId(), 
+          cluster.getNamesystem(1).getBlockPoolId()));
+      
+      Assert.assertTrue("Cluster should have 2 block pools", 
+          bpList.size() == 2);
+      
+      createReplicas(bpList, volumes, replicaMap);
+      ReplicaMap oldReplicaMap = new ReplicaMap(this);
+      oldReplicaMap.addAll(replicaMap);
+      
+      cluster.restartDataNode(0);
+      cluster.waitActive();
+      dn = cluster.getDataNodes().get(0);
+      dataSet = (FsDatasetImpl) dn.getFSDataset();
+      testEqualityOfReplicaMap(oldReplicaMap, dataSet.volumeMap, bpList);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /**
+   * Compare the replica map before and after the restart
+   **/
+  private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap 
+      newReplicaMap, List<String> bpidList) {
+    // Traversing through newReplica map and remove the corresponding 
+    // replicaInfo from oldReplicaMap.
+    for (String bpid: bpidList) {
+      for (ReplicaInfo info: newReplicaMap.replicas(bpid)) {
+        assertNotNull("Volume map before restart didn't contain the "
+            + "blockpool: " + bpid, oldReplicaMap.replicas(bpid));
+        
+        ReplicaInfo oldReplicaInfo = oldReplicaMap.get(bpid, 
+            info.getBlockId());
+        // Volume map after restart contains a blockpool id which 
+        assertNotNull("Old Replica Map didnt't contain block with blockId: " +
+            info.getBlockId(), oldReplicaInfo);
+        
+        ReplicaState oldState = oldReplicaInfo.getState();
+        // Since after restart, all the RWR, RBW and RUR blocks gets 
+        // converted to RWR
+        if (info.getState() == ReplicaState.RWR) {
+           if (oldState == ReplicaState.RWR || oldState == ReplicaState.RBW 
+               || oldState == ReplicaState.RUR) {
+             oldReplicaMap.remove(bpid, oldReplicaInfo);
+           }
+        } else if (info.getState() == ReplicaState.FINALIZED && 
+            oldState == ReplicaState.FINALIZED) {
+          oldReplicaMap.remove(bpid, oldReplicaInfo);
+        }
+      }
+    }
+    
+    // We don't persist the ReplicaInPipeline replica
+    // and if the old replica map contains any replica except ReplicaInPipeline
+    // then we didn't persist that replica
+    for (String bpid: bpidList) {
+      for (ReplicaInfo replicaInfo: oldReplicaMap.replicas(bpid)) {
+        if (replicaInfo.getState() != ReplicaState.TEMPORARY) {
+          Assert.fail("After datanode restart we lost the block with blockId: "
+              +  replicaInfo.getBlockId());
+        }
+      }
+    }
+  }
+
+  private void createReplicas(List<String> bpList, List<FsVolumeImpl> volumes,
+      ReplicaMap volumeMap) throws IOException {
+    Assert.assertTrue("Volume map can't be null" , volumeMap != null);
+    
+    // Here we create all different type of replicas and add it
+    // to volume map. 
+    // Created all type of ReplicaInfo, each under Blkpool corresponding volume
+    long id = 1; // This variable is used as both blockId and genStamp
+    for (String bpId: bpList) {
+      for (FsVolumeImpl volume: volumes) {
+        ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume,
+            DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id));
+        volumeMap.add(bpId, finalizedReplica);
+        id++;
+        
+        ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume, 
+            volume.getRbwDir(bpId), null, 100);
+        volumeMap.add(bpId, rbwReplica);
+        id++;
+
+        ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id, 
+            volume, volume.getRbwDir(bpId));
+        volumeMap.add(bpId, rwrReplica);
+        id++;
+        
+        ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume, 
+            volume.getTmpDir(bpId), 0);
+        volumeMap.add(bpId, ripReplica);
+        id++;
+      }
+    }
+    
+    for (String bpId: bpList) {
+      for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) {
+        File parentFile = replicaInfo.getBlockFile().getParentFile();
+        if (!parentFile.exists()) {
+          if (!parentFile.mkdirs()) {
+            throw new IOException("Failed to mkdirs " + parentFile);
+          }
+        }
+        replicaInfo.getBlockFile().createNewFile();
+        replicaInfo.getMetaFile().createNewFile();
+      }
+    }
+  }
 }

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java

@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -483,6 +485,10 @@ public class TestListCorruptFileBlocks {
         }
       }
 
+      // Run the direcrtoryScanner to update the Datanodes volumeMap
+      DataNode dn = cluster.getDataNodes().get(0);
+      DataNodeTestUtils.runDirectoryScanner(dn);
+
       // Occasionally the BlockPoolSliceScanner can run before we have removed
       // the blocks. Restart the Datanode to trigger the scanner into running
       // once more.

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.Test;
 
 public class TestProcessCorruptBlocks {
@@ -269,6 +270,8 @@ public class TestProcessCorruptBlocks {
     // But the datadirectory will not change
     assertTrue(cluster.corruptReplica(dnIndex, block));
 
+    // Run directory scanner to update the DN's volume map  
+    DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
     DataNodeProperties dnProps = cluster.stopDataNode(0);
 
     // Each datanode has multiple data dirs, check each

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.util.ThreadUtil;
 import org.junit.Test;
 
@@ -69,6 +70,8 @@ public class TestPendingCorruptDnMessages {
       ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
       assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
       
+      // Run directory dsscanner to update Datanode's volumeMap
+      DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
       // Stop the DN so the replica with the changed gen stamp will be reported
       // when this DN starts up.
       DataNodeProperties dnProps = cluster.stopDataNode(0);