فهرست منبع

HDFS-544. Add a "rbw" subdir to DataNode data directory. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-265@807379 13f79535-47bb-0310-9956-ffa450edef68
Hairong Kuang 16 سال پیش
والد
کامیت
163e6dca7d

+ 2 - 0
CHANGES.txt

@@ -18,6 +18,8 @@ Trunk (unreleased changes)
     HDFS-517. Introduce BlockInfoUnderConstruction to reflect block replica
     states while writing. (shv)
 
+    HDFS-544. Add a "rbw" subdir to DataNode data directory. (hairong)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file

+ 4 - 2
src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java

@@ -86,7 +86,9 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -19;
+  public static final int LAYOUT_VERSION = -20;
   // Current version: 
-  // -19: Sticky bit
+  // -20: DataNode adds a "rbw" sub directory to data directory
+  //      current dir contains "finalized" subdir for finalized replicas
+  //      and "rbw" subdir for replicas being written to.
 }

+ 3 - 0
src/java/org/apache/hadoop/hdfs/server/common/Storage.java

@@ -74,6 +74,9 @@ public abstract class Storage extends StorageInfo {
    * any upgrade code that uses this constant should also be removed. */
   public static final int PRE_GENERATIONSTAMP_LAYOUT_VERSION = -13;
   
+  // last layout version that did not support persistent rbw replicas
+  public static final int PRE_RBW_LAYOUT_VERSION = -19;
+  
   private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public static final String STORAGE_DIR_CURRENT   = "current";

+ 31 - 3
src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -53,6 +53,8 @@ public class DataStorage extends Storage {
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
   final static String COPY_FILE_PREFIX = "dncp_";
+  final static String STORAGE_DIR_RBW = "rbw";
+  final static String STORAGE_DIR_FINALIZED = "finalized";
   
   private String storageID;
 
@@ -277,8 +279,8 @@ public class DataStorage extends Storage {
     assert !tmpDir.exists() : "previous.tmp directory must not exist.";
     // rename current to tmp
     rename(curDir, tmpDir);
-    // hardlink blocks
-    linkBlocks(tmpDir, curDir, this.getLayoutVersion());
+    // hard link finalized & rbw blocks
+    linkAllBlocks(tmpDir, curDir);
     // write version file
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() :
@@ -359,8 +361,34 @@ public class DataStorage extends Storage {
       doFinalize(it.next());
     }
   }
+
+  /**
+   * Hardlink all finalized and RBW blocks in fromDir to toDir
+   * @param fromDir directory where the snapshot is stored
+   * @param toDir the current data directory
+   * @throws IOException if error occurs during hardlink
+   */
+  private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+    // do the link
+    int diskLayoutVersion = this.getLayoutVersion();
+    if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
+      // hardlink finalized blocks in tmpDir/finalized
+      linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
+          new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+      // hardlink rbw blocks in tmpDir/finalized
+      linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
+          new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion);
+    } else { // pre-RBW version
+      // hardlink finalized blocks in tmpDir
+      linkBlocks(fromDir, 
+          new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);      
+    }    
+  }
   
   static void linkBlocks(File from, File to, int oldLV) throws IOException {
+    if (!from.exists()) {
+      return;
+    }
     if (!from.isDirectory()) {
       if (from.getName().startsWith(COPY_FILE_PREFIX)) {
         FileInputStream in = new FileInputStream(from);
@@ -387,7 +415,7 @@ public class DataStorage extends Storage {
       return;
     }
     // from is a directory
-    if (!to.mkdir())
+    if (!to.mkdirs())
       throw new IOException("Cannot create directory " + to);
     String[] blockNames = from.list(new java.io.FilenameFilter() {
         public boolean accept(File dir, String name) {

+ 157 - 24
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -37,6 +39,7 @@ import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.DU;
 import org.apache.hadoop.fs.FileUtil;
@@ -54,8 +57,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.io.IOUtils;
 
 import org.mortbay.log.Log;
 
@@ -199,21 +201,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
         }
       }
 
-      File blockFiles[] = dir.listFiles();
-      for (File blockFile : blockFiles) {
-        if (Block.isBlockFilename(blockFile)) {
-          long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
-          long blockId = Block.filename2id(blockFile.getName());
-          ReplicaInfo oldReplica = volumeMap.add(
-              new FinalizedReplica(blockId, blockFile.length(), genStamp, 
-              volume, blockFile.getParentFile()));
-          if (oldReplica != null) {
-            DataNode.LOG.warn("Two block files have the same block id exits " +
-            		"on disk: " + oldReplica.getBlockFile() +
-            		" and " + blockFile );
-          }
-        }
-      }
+      volume.addToReplicasMap(volumeMap, dir, true);
     }
         
     /**
@@ -293,8 +281,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
   }
 
   class FSVolume {
-    private FSDir dataDir;
-    private File tmpDir;
+    private FSDir dataDir;      // directory store Finalized replica
+    private File rbwDir;        // directory store RBW replica
+    private File tmpDir;        // directory store Temporary replica
     private File detachDir; // copy on write for blocks in snapshot
     private DF usage;
     private DU dfsUsage;
@@ -305,10 +294,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
       boolean supportAppends = conf.getBoolean("dfs.support.append", false);
       File parent = currentDir.getParentFile();
+      final File finalizedDir = new File(
+          currentDir, DataStorage.STORAGE_DIR_FINALIZED);
 
       this.detachDir = new File(parent, "detach");
       if (detachDir.exists()) {
-        recoverDetachedBlocks(currentDir, detachDir);
+        recoverDetachedBlocks(finalizedDir, detachDir);
       }
 
       // Files that were being written when the datanode was last shutdown
@@ -319,12 +310,21 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
       this.tmpDir = new File(parent, "tmp");
       if (tmpDir.exists()) {
         if (supportAppends) {
-          recoverDetachedBlocks(currentDir, tmpDir);
+          recoverDetachedBlocks(finalizedDir, tmpDir);
         } else {
           FileUtil.fullyDelete(tmpDir);
         }
       }
-      this.dataDir = new FSDir(currentDir);
+      this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
+      if (rbwDir.exists() && !supportAppends) {
+        FileUtil.fullyDelete(rbwDir);
+      }
+      this.dataDir = new FSDir(finalizedDir);
+      if (!rbwDir.mkdirs()) {  // create rbw directory if not exist
+        if (!rbwDir.isDirectory()) {
+          throw new IOException("Mkdirs failed to create " + rbwDir.toString());
+        }
+      }
       if (!tmpDir.mkdirs()) {
         if (!tmpDir.isDirectory()) {
           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
@@ -429,10 +429,117 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     void checkDirs() throws DiskErrorException {
       dataDir.checkDirTree();
       DiskChecker.checkDir(tmpDir);
+      DiskChecker.checkDir(rbwDir);
     }
       
     void getVolumeMap(ReplicasMap volumeMap) {
+      // add finalized replicas
       dataDir.getVolumeMap(volumeMap, this);
+      // add rbw replicas
+      addToReplicasMap(volumeMap, rbwDir, false);
+    }
+
+    /**
+     * Add replicas under the given directory to the volume map
+     * @param volumeMap the replicas map
+     * @param dir an input directory
+     * @param isFinalized true if the directory has finalized replicas;
+     *                    false if the directory has rbw replicas
+     */
+    private void addToReplicasMap(ReplicasMap volumeMap, 
+        File dir, boolean isFinalized) {
+      File blockFiles[] = dir.listFiles();
+      for (File blockFile : blockFiles) {
+        if (!Block.isBlockFilename(blockFile))
+          continue;
+        
+        long genStamp = getGenerationStampFromFile(blockFiles, blockFile);
+        long blockId = Block.filename2id(blockFile.getName());
+        ReplicaInfo newReplica = null;
+        if (isFinalized) {
+          newReplica = new FinalizedReplica(blockId, 
+              blockFile.length(), genStamp, this, blockFile.getParentFile());
+        } else {
+          newReplica = new ReplicaWaitingToBeRecovered(blockId,
+              validateIntegrity(blockFile, genStamp), 
+              genStamp, this, blockFile.getParentFile());
+        }
+
+        ReplicaInfo oldReplica = volumeMap.add(newReplica);
+        if (oldReplica != null) {
+          DataNode.LOG.warn("Two block files with the same block id exist " +
+              "on disk: " + oldReplica.getBlockFile() +
+              " and " + blockFile );
+        }
+      }
+    }
+    
+    /**
+     * Find out the number of bytes in the block that match its crc.
+     * 
+     * This algorithm assumes that data corruption caused by unexpected 
+     * datanode shutdown occurs only in the last crc chunk. So it checks
+     * only the last chunk.
+     * 
+     * @param blockFile the block file
+     * @param genStamp generation stamp of the block
+     * @return the number of valid bytes
+     */
+    private long validateIntegrity(File blockFile, long genStamp) {
+      DataInputStream checksumIn = null;
+      InputStream blockIn = null;
+      try {
+        File metaFile = new File(getMetaFileName(blockFile.toString(), genStamp));
+        long blockFileLen = blockFile.length();
+        long metaFileLen = metaFile.length();
+        int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
+        if (!blockFile.exists() || blockFileLen == 0 ||
+            !metaFile.exists() || metaFileLen < (long)crcHeaderLen) {
+          return 0;
+        }
+        checksumIn = new DataInputStream(
+            new BufferedInputStream(new FileInputStream(metaFile),
+                BUFFER_SIZE));
+
+        // read and handle the common header here. For now just a version
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
+        short version = header.getVersion();
+        if (version != FSDataset.METADATA_VERSION) {
+          DataNode.LOG.warn("Wrong version (" + version + ") for metadata file "
+              + metaFile + " ignoring ...");
+        }
+        DataChecksum checksum = header.getChecksum();
+        int bytesPerChecksum = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+        long numChunks = Math.min(
+            (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, 
+            (metaFileLen - crcHeaderLen)/checksumSize);
+        if (numChunks == 0) {
+          return 0;
+        }
+        IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
+        blockIn = new FileInputStream(blockFile);
+        long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
+        IOUtils.skipFully(blockIn, lastChunkStartPos);
+        int lastChunkSize = (int)Math.min(
+            bytesPerChecksum, blockFileLen-lastChunkStartPos);
+        byte[] buf = new byte[lastChunkSize+checksumSize];
+        checksumIn.readFully(buf, lastChunkSize, checksumSize);
+        IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
+
+        checksum.update(buf, 0, lastChunkSize);
+        if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
+          return lastChunkStartPos + lastChunkSize;
+        } else { // last chunck is corrupt
+          return lastChunkStartPos;
+        }
+      } catch (IOException e) {
+        DataNode.LOG.warn(e);
+        return 0;
+      } finally {
+        IOUtils.closeStream(checksumIn);
+        IOUtils.closeStream(blockIn);
+      }
     }
       
     void clearPath(File f) {
@@ -440,7 +547,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     }
       
     public String toString() {
-      return dataDir.dir.getAbsolutePath();
+      return getDir().getAbsolutePath();
     }
 
     /**
@@ -630,6 +737,26 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
     return getMetaFile(getBlockFile(b), b);
   }
 
+  /** Find the metadata file for the specified block file.
+   * Return the generation stamp from the name of the metafile.
+   */
+  private static long getGenerationStampFromFile(File[] listdir, File blockFile) {
+    String blockName = blockFile.getName();
+    for (int j = 0; j < listdir.length; j++) {
+      String path = listdir[j].getName();
+      if (!path.startsWith(blockName)) {
+        continue;
+      }
+      if (blockFile == listdir[j]) {
+        continue;
+      }
+      return Block.getGenerationStamp(listdir[j].getName());
+    }
+    DataNode.LOG.warn("Block " + blockFile + 
+                      " does not have a metafile!");
+    return Block.GRANDFATHER_GENERATION_STAMP;
+  }
+
   /** Find the corresponding meta data file from a given block file */
   private static File findMetaFile(final File blockFile) throws IOException {
     final String prefix = blockFile.getName() + "_";
@@ -1414,7 +1541,13 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
           error = true;
           continue;
         }
-        v.clearPath(parent);
+        ReplicaState replicaState = dinfo.getState();
+        if (replicaState == ReplicaState.FINALIZED || 
+            (replicaState == ReplicaState.RUR && 
+                ((ReplicaUnderRecovery)dinfo).getOrignalReplicaState() == 
+                  ReplicaState.FINALIZED)) {
+          v.clearPath(parent);
+        }
         volumeMap.remove(invalidBlks[i]);
       }
       File metaFile = getMetaFile( f, invalidBlks[i] );

+ 1 - 1
src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java

@@ -96,7 +96,7 @@ import org.apache.hadoop.io.WritableUtils;
 class ImageLoaderCurrent implements ImageLoader {
   protected final DateFormat dateFormat = 
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static int [] versions = {-16, -17, -18, -19};
+  private static int [] versions = {-16, -17, -18, -19, -20};
   private int imageVersion = 0;
 
   /* (non-Javadoc)

+ 1 - 1
src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -110,7 +110,7 @@ public class DFSTestUtil {
   /** create nFiles with random names and directory hierarchies
    *  with random (but reproducible) data in them.
    */
-  void createFiles(FileSystem fs, String topdir,
+  public void createFiles(FileSystem fs, String topdir,
                    short replicationFactor) throws IOException {
     files = new MyFile[nFiles];
     

+ 3 - 1
src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java

@@ -74,6 +74,8 @@ public class MiniDFSCluster {
   private File base_dir;
   private File data_dir;
   
+  public final static String FINALIZED_DIR_NAME = "/current/finalized/";
+  
   
   /**
    * This null constructor is used only when wishing to start a data node cluster
@@ -631,7 +633,7 @@ public class MiniDFSCluster {
     if (i < 0 || i >= dataNodes.size())
       return false;
     for (int dn = i*2; dn < i*2+2; dn++) {
-      File blockFile = new File(dataDir, "data" + (dn+1) + "/current/" +
+      File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
                                 blockName);
       System.out.println("Corrupting for: " + blockFile);
       if (blockFile.exists()) {

+ 3 - 3
src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java

@@ -123,14 +123,14 @@ public class TestBlockMissingException extends TestCase {
    * The Data directories for a datanode
    */
   private File[] getDataNodeDirs(int i) throws IOException {
-    File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+    String base_dir = MiniDFSCluster.getBaseDirectory();
     File data_dir = new File(base_dir, "data");
     File dir1 = new File(data_dir, "data"+(2*i+1));
     File dir2 = new File(data_dir, "data"+(2*i+2));
     if (dir1.isDirectory() && dir2.isDirectory()) {
       File[] dir = new File[2];
-      dir[0] = new File(dir1, "current");
-      dir[1] = new File(dir2, "current"); 
+      dir[0] = new File(dir1, MiniDFSCluster.FINALIZED_DIR_NAME);
+      dir[1] = new File(dir2, MiniDFSCluster.FINALIZED_DIR_NAME); 
       return dir;
     }
     return new File[0];

+ 10 - 10
src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java

@@ -134,8 +134,8 @@ public class TestDatanodeBlockScanner extends TestCase {
     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
     boolean corrupted = false;
     for (int i=replica*2; i<replica*2+2; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
-                               blockName);
+      File blockFile = new File(baseDir, "data" + (i+1) + 
+          MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
       if (blockFile.exists()) {
         // Corrupt replica by writing random bytes into replica
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
@@ -175,7 +175,7 @@ public class TestDatanodeBlockScanner extends TestCase {
                    getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
       blockCount = blocks.get(0).getLocations().length;
       try {
-        LOG.info("Looping until expected blockCount of 3 is received");
+        LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
         Thread.sleep(1000);
       } catch (InterruptedException ignore) {
       }
@@ -194,7 +194,7 @@ public class TestDatanodeBlockScanner extends TestCase {
                    getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
       blockCount = blocks.get(0).getLocations().length;
       try {
-        LOG.info("Looping until expected blockCount of 2 is received");
+        LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
         Thread.sleep(1000);
       } catch (InterruptedException ignore) {
       }
@@ -412,8 +412,8 @@ public class TestDatanodeBlockScanner extends TestCase {
   static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
     for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
-                               blockName);
+      File blockFile = new File(baseDir, "data" + (i+1) + 
+          MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
       if (blockFile.exists()) {
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
         raFile.setLength(raFile.length()+lenDelta);
@@ -427,10 +427,10 @@ public class TestDatanodeBlockScanner extends TestCase {
   private static void waitForBlockDeleted(String blockName, int dnIndex) 
   throws IOException, InterruptedException {
     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
-    File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1)+ "/current/" + 
-        blockName);
-    File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2)+ "/current/" + 
-        blockName);
+    File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) + 
+        MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
+    File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) + 
+        MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
     while (blockFile1.exists() || blockFile2.exists()) {
       Thread.sleep(100);
     }

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java

@@ -110,11 +110,11 @@ public class TestFileCorruption extends TestCase {
       
       // get the block
       File dataDir = new File(cluster.getDataDirectory(),
-          "data1/current");
+          "data1" + MiniDFSCluster.FINALIZED_DIR_NAME);
       Block blk = getBlock(dataDir);
       if (blk == null) {
         blk = getBlock(new File(cluster.getDataDirectory(),
-          "dfs/data/data2/current"));
+          "dfs/data/data2" + MiniDFSCluster.FINALIZED_DIR_NAME));
       }
       assertFalse(blk==null);
 

+ 3 - 2
src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java

@@ -291,7 +291,7 @@ public class TestReplication extends TestCase {
    * for under replicated blocks. 
    * 
    * It creates a file with one block and replication of 4. It corrupts 
-   * two of the blocks and removes one of the replicas. Expected behaviour is
+   * two of the blocks and removes one of the replicas. Expected behavior is
    * that missing replica will be copied from one valid source.
    */
   public void testPendingReplicationRetry() throws IOException {
@@ -341,7 +341,8 @@ public class TestReplication extends TestCase {
       
       int fileCount = 0;
       for (int i=0; i<6; i++) {
-        File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
+        File blockFile = new File(baseDir, "data" + (i+1) + 
+            MiniDFSCluster.FINALIZED_DIR_NAME + block);
         LOG.info("Checking for file " + blockFile);
         
         if (blockFile.exists()) {

+ 2 - 2
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java

@@ -98,7 +98,7 @@ public class TestDataNodeVolumeFailure extends TestCase{
     // fail the volume
     // delete/make non-writable one of the directories (failed volume)
     data_fail = new File(dataDir, "data3");
-    failedDir = new File(data_fail, "current");
+    failedDir = new File(data_fail, MiniDFSCluster.FINALIZED_DIR_NAME);
     if (failedDir.exists() &&
         //!FileUtil.fullyDelete(failedDir)
         !deteteBlocks(failedDir)
@@ -302,7 +302,7 @@ public class TestDataNodeVolumeFailure extends TestCase{
     int total = 0;
     for(int i=0; i<dn_num; i++) {
       for(int j=1; j<=2; j++) {
-        File dir = new File(new File(dataDir, "data"+(2*i+j)), "current");
+        File dir = new File(dataDir, "data"+(2*i+j)+MiniDFSCluster.FINALIZED_DIR_NAME);
         if(dir == null) {
           System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
           continue;

+ 126 - 0
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java

@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.io.IOUtils;
+
+import junit.framework.TestCase;
+
+/** Test if a datanode can correctly upgrade itself */
+public class TestDatanodeRestart extends TestCase {
+  // test finalized replicas persist across DataNode restarts
+  public void testFinalizedReplicas() throws Exception {
+    // bring up a cluster of 3
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", 1024L);
+    conf.setInt("dfs.write.packet.size", 512);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    cluster.waitActive();
+    FileSystem fs = cluster.getFileSystem();
+    try {
+      // test finalized replicas
+      final String TopDir = "/test";
+      DFSTestUtil util = new DFSTestUtil("TestCrcCorruption", 2, 3, 8*1024);
+      util.createFiles(fs, TopDir, (short)3);
+      util.waitReplication(fs, TopDir, (short)3);
+      util.checkFiles(fs, TopDir);
+      cluster.restartDataNodes();
+      cluster.waitActive();
+      util.checkFiles(fs, TopDir);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  // test rbw replicas persist across DataNode restarts
+  public void testRbwReplicas() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setLong("dfs.block.size", 1024L);
+    conf.setInt("dfs.write.packet.size", 512);
+    conf.setBoolean("dfs.support.append", true);
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    cluster.waitActive();
+    try {
+      testRbwReplicas(cluster, false);
+      testRbwReplicas(cluster, true);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+    
+  private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt) 
+  throws IOException {
+    FSDataOutputStream out = null;
+    try {
+      FileSystem fs = cluster.getFileSystem();
+      final int fileLen = 515;
+      // create some rbw replicas on disk
+      byte[] writeBuf = new byte[fileLen];
+      new Random().nextBytes(writeBuf);
+      final Path src = new Path("/test.txt");
+      out = fs.create(src);
+      out.write(writeBuf);
+      out.sync();
+      DataNode dn = cluster.getDataNodes().get(0);
+      // move tmp replicas to be rbw replicas: this is a temporary trick
+      for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
+        File currentDir = volume.getDir().getParentFile();
+        File tmpDir = new File(currentDir.getParentFile(), "tmp");
+        File rbwDir = new File(currentDir, "rbw");
+        for (File file : tmpDir.listFiles()) {
+          if (isCorrupt && Block.isBlockFilename(file)) {
+            new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt
+          }
+          file.renameTo(new File(rbwDir, file.getName()));
+        }
+      }
+      cluster.restartDataNodes();
+      cluster.waitActive();
+      dn = cluster.getDataNodes().get(0);
+
+      // check volumeMap: one rwr replica
+      ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
+      assertEquals(1, replicas.size());
+      ReplicaInfo replica = replicas.replicas().iterator().next();
+      assertEquals(ReplicaState.RWR, replica.getState());
+      if (isCorrupt) {
+        assertEquals((fileLen-1)/512*512, replica.getNumBytes());
+      } else {
+        assertEquals(fileLen, replica.getNumBytes());
+      }
+      dn.data.invalidate(new Block[]{replica});
+      fs.delete(src, false);
+    } finally {
+      IOUtils.closeStream(out);
+    }      
+  }
+}

+ 4 - 3
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -182,7 +182,8 @@ public class TestFsck extends TestCase {
       File baseDir = new File(System.getProperty("test.build.data",
                                                  "build/test/data"),"dfs/data");
       for (int i=0; i<8; i++) {
-        File blockFile = new File(baseDir, "data" +(i+1)+ "/current/" + block);
+        File blockFile = new File(baseDir, "data" +(i+1) + 
+            MiniDFSCluster.FINALIZED_DIR_NAME + block);
         if(blockFile.exists()) {
           assertTrue(blockFile.delete());
         }
@@ -294,8 +295,8 @@ public class TestFsck extends TestCase {
     File baseDir = new File(System.getProperty("test.build.data",
                                                "build/test/data"),"dfs/data");
     for (int i=0; i < 6; i++) {
-      File blockFile = new File(baseDir, "data" + (i+1) + "/current/" +
-                                block);
+      File blockFile = new File(baseDir, "data" + (i+1) + 
+          MiniDFSCluster.FINALIZED_DIR_NAME + block);
       if (blockFile.exists()) {
         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
         FileChannel channel = raFile.getChannel();

+ 2 - 1
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java

@@ -56,7 +56,8 @@ public class TestOverReplicatedBlocks extends TestCase {
       DataNodeProperties dnProps = cluster.stopDataNode(0);
       // remove block scanner log to trigger block scanning
       File scanLog = new File(System.getProperty("test.build.data"),
-          "dfs/data/data1/current/dncp_block_verification.log.curr");
+          "dfs/data/data1" + MiniDFSCluster.FINALIZED_DIR_NAME + 
+          "dncp_block_verification.log.curr");
       //wait for one minute for deletion to succeed;
       for(int i=0; !scanLog.delete(); i++) {
         assertTrue("Could not delete log file in one minute", i < 60);