Browse Source

HDFS-10636. Modify ReplicaInfo to remove the assumption that replica metadata and data are stored in java.io.File. (Virajith Jalaparti via lei)

Lei Xu 8 years ago
parent
commit
86c9862bec
41 changed files with 2069 additions and 1118 deletions
  1. 15 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
  2. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  3. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
  4. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  5. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
  6. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
  7. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
  8. 26 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
  9. 479 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java
  10. 417 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java
  11. 8 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
  12. 252 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
  13. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java
  14. 51 273
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
  15. 0 86
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
  16. 169 201
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
  17. 12 18
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
  18. 26 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
  19. 4 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  20. 49 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
  21. 44 27
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
  22. 199 369
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  23. 18 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
  24. 145 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  25. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
  26. 17 17
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
  27. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
  28. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java
  29. 26 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  30. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolSliceStorage.java
  31. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
  32. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java
  33. 8 9
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
  34. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
  35. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
  36. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
  37. 24 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
  38. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
  39. 21 22
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
  40. 15 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
  41. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

+ 15 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -741,7 +742,20 @@ public class BlockPoolSliceStorage extends Storage {
    *
    * @return the trash directory for a given block file that is being deleted.
    */
-  public String getTrashDirectory(File blockFile) {
+  public String getTrashDirectory(ReplicaInfo info) {
+
+    URI blockURI = info.getBlockURI();
+    try{
+      File blockFile = new File(blockURI);
+      return getTrashDirectory(blockFile);
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Failed to get block file for replica " + info, e);
+    }
+
+    return null;
+  }
+
+  private String getTrashDirectory(File blockFile) {
     if (isTrashAllowed(blockFile)) {
       Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
       String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -121,7 +121,7 @@ class BlockReceiver implements Closeable {
   /** the block to receive */
   private final ExtendedBlock block; 
   /** the replica to write */
-  private ReplicaInPipelineInterface replicaInfo;
+  private ReplicaInPipeline replicaInfo;
   /** pipeline stage */
   private final BlockConstructionStage stage;
   private final boolean isTransfer;

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -248,8 +249,8 @@ class BlockSender implements java.io.Closeable {
       }
       // if there is a write in progress
       ChunkChecksum chunkChecksum = null;
-      if (replica instanceof ReplicaBeingWritten) {
-        final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
+      if (replica.getState() == ReplicaState.RBW) {
+        final ReplicaInPipeline rbw = (ReplicaInPipeline) replica;
         waitForMinLength(rbw, startOffset + length);
         chunkChecksum = rbw.getLastChecksumAndDataLen();
       }
@@ -473,7 +474,7 @@ class BlockSender implements java.io.Closeable {
    * @param len minimum length to reach
    * @throws IOException on failing to reach the len in given wait time
    */
-  private static void waitForMinLength(ReplicaBeingWritten rbw, long len)
+  private static void waitForMinLength(ReplicaInPipeline rbw, long len)
       throws IOException {
     // Wait for 3 seconds for rbw replica to reach the minimum length
     for (int i = 0; i < 30 && rbw.getBytesOnDisk() < len; i++) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -3474,4 +3474,4 @@ public class DataNode extends ReconfigurableBase
   void setBlockScanner(BlockScanner blockScanner) {
     this.blockScanner = blockScanner;
   }
-}
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java

@@ -56,6 +56,6 @@ public class DataNodeFaultInjector {
 
   public void failMirrorConnection() throws IOException { }
 
-  public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+  public void failPipeline(ReplicaInPipeline replicaInfo,
       String mirrorAddr) throws IOException { }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

@@ -204,9 +204,9 @@ public class DataStorage extends Storage {
    * @return trash directory if rolling upgrade is in progress, null
    *         otherwise.
    */
-  public String getTrashDirectoryForBlockFile(String bpid, File blockFile) {
+  public String getTrashDirectoryForReplica(String bpid, ReplicaInfo info) {
     if (trashEnabledBpids.contains(bpid)) {
-      return getBPStorage(bpid).getTrashDirectory(blockFile);
+      return getBPStorage(bpid).getTrashDirectory(info);
     }
     return null;
   }

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java

@@ -597,14 +597,14 @@ public class DirectoryScanner implements Runnable {
         diffs.put(bpid, diffRecord);
         
         statsRecord.totalBlocks = blockpoolReport.length;
-        List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
-        FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
+        List<ReplicaInfo> bl = dataset.getFinalizedBlocks(bpid);
+        ReplicaInfo[] memReport = bl.toArray(new ReplicaInfo[bl.size()]);
         Arrays.sort(memReport); // Sort based on blockId
   
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
         while (m < memReport.length && d < blockpoolReport.length) {
-          FinalizedReplica memBlock = memReport[m];
+          ReplicaInfo memBlock = memReport[m];
           ScanInfo info = blockpoolReport[d];
           if (info.getBlockId() < memBlock.getBlockId()) {
             if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
@@ -633,7 +633,7 @@ public class DirectoryScanner implements Runnable {
             // or block file length is different than expected
             statsRecord.mismatchBlocks++;
             addDifference(diffRecord, statsRecord, info);
-          } else if (info.getBlockFile().compareTo(memBlock.getBlockFile()) != 0) {
+          } else if (memBlock.compareWith(info) != 0) {
             // volumeMap record and on-disk files don't match.
             statsRecord.duplicateBlocks++;
             addDifference(diffRecord, statsRecord, info);
@@ -652,7 +652,7 @@ public class DirectoryScanner implements Runnable {
           }
         }
         while (m < memReport.length) {
-          FinalizedReplica current = memReport[m++];
+          ReplicaInfo current = memReport[m++];
           addDifference(diffRecord, statsRecord,
                         current.getBlockId(), current.getVolume());
         }

+ 26 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java

@@ -22,11 +22,12 @@ import java.io.File;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 
 /**
  * This class describes a replica that has been finalized.
  */
-public class FinalizedReplica extends ReplicaInfo {
+public class FinalizedReplica extends LocalReplica {
 
   /**
    * Constructor
@@ -88,4 +89,28 @@ public class FinalizedReplica extends ReplicaInfo {
   public String toString() {
     return super.toString();
   }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support setRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
 }

+ 479 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java

@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for all replicas which are on local storage media
+ * and hence, are backed by files.
+ */
+abstract public class LocalReplica extends ReplicaInfo {
+
+  /**
+   * Base directory containing numerically-identified sub directories and
+   * possibly blocks.
+   */
+  private File baseDir;
+
+  /**
+   * Whether or not this replica's parent directory includes subdirs, in which
+   * case we can generate them based on the replica's block ID
+   */
+  private boolean hasSubdirs;
+
+  private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
+
+  static final Log LOG = LogFactory.getLog(LocalReplica.class);
+  private final static boolean IS_NATIVE_IO_AVAIL;
+  static {
+    IS_NATIVE_IO_AVAIL = NativeIO.isAvailable();
+    if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) {
+      LOG.warn("Data node cannot fully support concurrent reading"
+          + " and writing without native code extensions on Windows.");
+    }
+  }
+
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  LocalReplica(Block block, FsVolumeSpi vol, File dir) {
+    this(block.getBlockId(), block.getNumBytes(),
+        block.getGenerationStamp(), vol, dir);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  LocalReplica(long blockId, long len, long genStamp,
+      FsVolumeSpi vol, File dir) {
+    super(vol, blockId, len, genStamp);
+    setDirInternal(dir);
+  }
+
+  /**
+   * Copy constructor.
+   * @param from the source replica
+   */
+  LocalReplica(LocalReplica from) {
+    this(from, from.getVolume(), from.getDir());
+  }
+
+  /**
+   * Get the full path of this replica's data file.
+   * @return the full path of this replica's data file
+   */
+  @VisibleForTesting
+  public File getBlockFile() {
+    return new File(getDir(), getBlockName());
+  }
+
+  /**
+   * Get the full path of this replica's meta file.
+   * @return the full path of this replica's meta file
+   */
+  @VisibleForTesting
+  public File getMetaFile() {
+    return new File(getDir(),
+        DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
+  }
+
+  /**
+   * Return the parent directory path where this replica is located.
+   * @return the parent directory path where this replica is located
+   */
+  protected File getDir() {
+    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+        getBlockId()) : baseDir;
+  }
+
+  /**
+   * Set the parent directory where this replica is located.
+   * @param dir the parent directory where the replica is located
+   */
+  private void setDirInternal(File dir) {
+    if (dir == null) {
+      baseDir = null;
+      return;
+    }
+
+    ReplicaDirInfo dirInfo = parseBaseDir(dir);
+    this.hasSubdirs = dirInfo.hasSubidrs;
+
+    synchronized (internedBaseDirs) {
+      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
+        // Create a new String path of this file and make a brand new File object
+        // to guarantee we drop the reference to the underlying char[] storage.
+        File baseDir = new File(dirInfo.baseDirPath);
+        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
+      }
+      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
+    }
+  }
+
+  @VisibleForTesting
+  public static class ReplicaDirInfo {
+    public String baseDirPath;
+    public boolean hasSubidrs;
+
+    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+      this.baseDirPath = baseDirPath;
+      this.hasSubidrs = hasSubidrs;
+    }
+  }
+
+  @VisibleForTesting
+  public static ReplicaDirInfo parseBaseDir(File dir) {
+
+    File currentDir = dir;
+    boolean hasSubdirs = false;
+    while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
+      hasSubdirs = true;
+      currentDir = currentDir.getParentFile();
+    }
+
+    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
+  }
+
+  /**
+   * Copy specified file into a temporary file. Then rename the
+   * temporary file to the original name. This will cause any
+   * hardlinks to the original file to be removed. The temporary
+   * files are created in the same directory. The temporary files will
+   * be recovered (especially on Windows) on datanode restart.
+   */
+  private void breakHardlinks(File file, Block b) throws IOException {
+    File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
+    try (FileInputStream in = new FileInputStream(file)) {
+      try (FileOutputStream out = new FileOutputStream(tmpFile)){
+        IOUtils.copyBytes(in, out, 16 * 1024);
+      }
+      if (file.length() != tmpFile.length()) {
+        throw new IOException("Copy of file " + file + " size " + file.length()+
+                              " into file " + tmpFile +
+                              " resulted in a size of " + tmpFile.length());
+      }
+      FileUtil.replaceFile(tmpFile, file);
+    } catch (IOException e) {
+      boolean done = tmpFile.delete();
+      if (!done) {
+        DataNode.LOG.info("detachFile failed to delete temporary file " +
+                          tmpFile);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * This function "breaks hardlinks" to the current replica file.
+   *
+   * When doing a DataNode upgrade, we create a bunch of hardlinks to each block
+   * file.  This cleverly ensures that both the old and the new storage
+   * directories can contain the same block file, without using additional space
+   * for the data.
+   *
+   * However, when we want to append to the replica file, we need to "break" the
+   * hardlink to ensure that the old snapshot continues to contain the old data
+   * length.  If we failed to do that, we could roll back to the previous/
+   * directory during a downgrade, and find that the block contents were longer
+   * than they were at the time of upgrade.
+   *
+   * @return true only if data was copied.
+   * @throws IOException
+   */
+  public boolean breakHardLinksIfNeeded() throws IOException {
+    File file = getBlockFile();
+    if (file == null || getVolume() == null) {
+      throw new IOException("detachBlock:Block not found. " + this);
+    }
+    File meta = getMetaFile();
+
+    int linkCount = HardLink.getLinkCount(file);
+    if (linkCount > 1) {
+      DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
+          "block " + this);
+      breakHardlinks(file, this);
+    }
+    if (HardLink.getLinkCount(meta) > 1) {
+      breakHardlinks(meta, this);
+    }
+    return true;
+  }
+
+  @Override
+  public URI getBlockURI() {
+    return getBlockFile().toURI();
+  }
+
+  @Override
+  public InputStream getDataInputStream(long seekOffset) throws IOException {
+
+    File blockFile = getBlockFile();
+    if (IS_NATIVE_IO_AVAIL) {
+      return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+    } else {
+      try {
+        return FsDatasetUtil.openAndSeek(blockFile, seekOffset);
+      } catch (FileNotFoundException fnfe) {
+        throw new IOException("Block " + this + " is not valid. " +
+            "Expected block file at " + blockFile + " does not exist.");
+      }
+    }
+  }
+
+  @Override
+  public OutputStream getDataOutputStream(boolean append) throws IOException {
+    return new FileOutputStream(getBlockFile(), append);
+  }
+
+  @Override
+  public boolean blockDataExists() {
+    return getBlockFile().exists();
+  }
+
+  @Override
+  public boolean deleteBlockData() {
+    return getBlockFile().delete();
+  }
+
+  @Override
+  public long getBlockDataLength() {
+    return getBlockFile().length();
+  }
+
+  @Override
+  public URI getMetadataURI() {
+    return getMetaFile().toURI();
+  }
+
+  @Override
+  public LengthInputStream getMetadataInputStream(long offset)
+      throws IOException {
+    File meta = getMetaFile();
+    return new LengthInputStream(
+        FsDatasetUtil.openAndSeek(meta, offset), meta.length());
+  }
+
+  @Override
+  public OutputStream getMetadataOutputStream(boolean append)
+      throws IOException {
+    return new FileOutputStream(getMetaFile(), append);
+  }
+
+  @Override
+  public boolean metadataExists() {
+    return getMetaFile().exists();
+  }
+
+  @Override
+  public boolean deleteMetadata() {
+    return getMetaFile().delete();
+  }
+
+  @Override
+  public long getMetadataLength() {
+    return getMetaFile().length();
+  }
+
+  @Override
+  public boolean renameMeta(URI destURI) throws IOException {
+    return renameFile(getMetaFile(), new File(destURI));
+  }
+
+  @Override
+  public boolean renameData(URI destURI) throws IOException {
+    return renameFile(getBlockFile(), new File(destURI));
+  }
+
+  private boolean renameFile(File srcfile, File destfile) throws IOException {
+    try {
+      NativeIO.renameTo(srcfile, destfile);
+      return true;
+    } catch (IOException e) {
+      throw new IOException("Failed to move block file for " + this
+          + " from " + srcfile + " to " + destfile.getAbsolutePath(), e);
+    }
+  }
+
+  @Override
+  public void updateWithReplica(StorageLocation replicaLocation) {
+    // for local replicas, the replica location is assumed to be a file.
+    File diskFile = replicaLocation.getFile();
+    if (null == diskFile) {
+      setDirInternal(null);
+    } else {
+      setDirInternal(diskFile.getParentFile());
+    }
+  }
+
+  @Override
+  public boolean getPinning(LocalFileSystem localFS) throws IOException {
+    FileStatus fss =
+        localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath()));
+    return fss.getPermission().getStickyBit();
+  }
+
+  @Override
+  public void setPinning(LocalFileSystem localFS) throws IOException {
+    File f = getBlockFile();
+    Path p = new Path(f.getAbsolutePath());
+
+    FsPermission oldPermission = localFS.getFileStatus(
+        new Path(f.getAbsolutePath())).getPermission();
+    //sticky bit is used for pinning purpose
+    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+    localFS.setPermission(p, permission);
+  }
+
+  @Override
+  public void bumpReplicaGS(long newGS) throws IOException {
+    long oldGS = getGenerationStamp();
+    File oldmeta = getMetaFile();
+    setGenerationStamp(newGS);
+    File newmeta = getMetaFile();
+
+    // rename meta file to new GS
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    try {
+      // calling renameMeta on the ReplicaInfo doesn't work here
+      NativeIO.renameTo(oldmeta, newmeta);
+    } catch (IOException e) {
+      setGenerationStamp(oldGS); // restore old GS
+      throw new IOException("Block " + this + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to " + newmeta, e);
+    }
+  }
+
+  @Override
+  public void truncateBlock(long newLength) throws IOException {
+    truncateBlock(getBlockFile(), getMetaFile(), getNumBytes(), newLength);
+  }
+
+  @Override
+  public int compareWith(ScanInfo info) {
+    return info.getBlockFile().compareTo(getBlockFile());
+  }
+
+  static public void truncateBlock(File blockFile, File metaFile,
+      long oldlen, long newlen) throws IOException {
+    LOG.info("truncateBlock: blockFile=" + blockFile
+        + ", metaFile=" + metaFile
+        + ", oldlen=" + oldlen
+        + ", newlen=" + newlen);
+
+    if (newlen == oldlen) {
+      return;
+    }
+    if (newlen > oldlen) {
+      throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
+          + ") to newlen (=" + newlen + ")");
+    }
+
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+    int checksumsize = dcs.getChecksumSize();
+    int bpc = dcs.getBytesPerChecksum();
+    long n = (newlen - 1)/bpc + 1;
+    long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
+    long lastchunkoffset = (n - 1)*bpc;
+    int lastchunksize = (int)(newlen - lastchunkoffset);
+    byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
+
+    RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+    try {
+      //truncate blockFile
+      blockRAF.setLength(newlen);
+
+      //read last chunk
+      blockRAF.seek(lastchunkoffset);
+      blockRAF.readFully(b, 0, lastchunksize);
+    } finally {
+      blockRAF.close();
+    }
+
+    //compute checksum
+    dcs.update(b, 0, lastchunksize);
+    dcs.writeValue(b, 0, false);
+
+    //update metaFile
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+    try {
+      metaRAF.setLength(newmetalen);
+      metaRAF.seek(newmetalen - checksumsize);
+      metaRAF.write(b, 0, checksumsize);
+    } finally {
+      metaRAF.close();
+    }
+  }
+
+  @Override
+  public void copyMetadata(URI destination) throws IOException {
+    //for local replicas, we assume the destination URI is file
+    Storage.nativeCopyFileUnbuffered(getMetaFile(),
+        new File(destination), true);
+  }
+
+  @Override
+  public void copyBlockdata(URI destination) throws IOException {
+    //for local replicas, we assume the destination URI is file
+    Storage.nativeCopyFileUnbuffered(getBlockFile(),
+        new File(destination), true);
+  }
+
+}

+ 417 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java

@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class defines a replica in a pipeline, which
+ * includes a persistent replica being written to by a dfs client or
+ * a temporary replica being replicated by a source datanode or
+ * being copied for the balancing purpose.
+ *
+ * The base class implements a temporary replica
+ */
+public class LocalReplicaInPipeline extends LocalReplica
+                        implements ReplicaInPipeline {
+  private long bytesAcked;
+  private long bytesOnDisk;
+  private byte[] lastChecksum;
+  private AtomicReference<Thread> writer = new AtomicReference<Thread>();
+
+  /**
+   * Bytes reserved for this replica on the containing volume.
+   * Based off difference between the estimated maximum block length and
+   * the bytes already written to this block.
+   */
+  private long bytesReserved;
+  private final long originalBytesReserved;
+
+  /**
+   * Constructor for a zero length replica.
+   * @param blockId block id
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
+   */
+  public LocalReplicaInPipeline(long blockId, long genStamp,
+        FsVolumeSpi vol, File dir, long bytesToReserve) {
+    this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(),
+        bytesToReserve);
+  }
+
+  /**
+   * Constructor
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   */
+  LocalReplicaInPipeline(Block block,
+      FsVolumeSpi vol, File dir, Thread writer) {
+    this(block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
+        vol, dir, writer, 0L);
+  }
+
+  /**
+   * Constructor
+   * @param blockId block id
+   * @param len replica length
+   * @param genStamp replica generation stamp
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   * @param writer a thread that is writing to this replica
+   * @param bytesToReserve disk space to reserve for this replica, based on
+   *                       the estimated maximum block length.
+   */
+  LocalReplicaInPipeline(long blockId, long len, long genStamp,
+      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
+    super(blockId, len, genStamp, vol, dir);
+    this.bytesAcked = len;
+    this.bytesOnDisk = len;
+    this.writer.set(writer);
+    this.bytesReserved = bytesToReserve;
+    this.originalBytesReserved = bytesToReserve;
+  }
+
+  /**
+   * Copy constructor.
+   * @param from where to copy from
+   */
+  public LocalReplicaInPipeline(LocalReplicaInPipeline from) {
+    super(from);
+    this.bytesAcked = from.getBytesAcked();
+    this.bytesOnDisk = from.getBytesOnDisk();
+    this.writer.set(from.writer.get());
+    this.bytesReserved = from.bytesReserved;
+    this.originalBytesReserved = from.originalBytesReserved;
+  }
+
+  @Override
+  public long getVisibleLength() {
+    return -1;
+  }
+
+  @Override  //ReplicaInfo
+  public ReplicaState getState() {
+    return ReplicaState.TEMPORARY;
+  }
+
+  @Override // ReplicaInPipeline
+  public long getBytesAcked() {
+    return bytesAcked;
+  }
+
+  @Override // ReplicaInPipeline
+  public void setBytesAcked(long bytesAcked) {
+    long newBytesAcked = bytesAcked - this.bytesAcked;
+    this.bytesAcked = bytesAcked;
+
+    // Once bytes are ACK'ed we can release equivalent space from the
+    // volume's reservedForRbw count. We could have released it as soon
+    // as the write-to-disk completed but that would be inefficient.
+    getVolume().releaseReservedSpace(newBytesAcked);
+    bytesReserved -= newBytesAcked;
+  }
+
+  @Override // ReplicaInPipeline
+  public long getBytesOnDisk() {
+    return bytesOnDisk;
+  }
+
+  @Override
+  public long getBytesReserved() {
+    return bytesReserved;
+  }
+
+  @Override
+  public long getOriginalBytesReserved() {
+    return originalBytesReserved;
+  }
+
+  @Override // ReplicaInPipeline
+  public void releaseAllBytesReserved() {
+    getVolume().releaseReservedSpace(bytesReserved);
+    getVolume().releaseLockedMemory(bytesReserved);
+    bytesReserved = 0;
+  }
+
+  @Override // ReplicaInPipeline
+  public synchronized void setLastChecksumAndDataLen(long dataLength,
+      byte[] checksum) {
+    this.bytesOnDisk = dataLength;
+    this.lastChecksum = checksum;
+  }
+
+  @Override // ReplicaInPipeline
+  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
+    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
+  }
+
+  @Override // ReplicaInPipeline
+  public void setWriter(Thread writer) {
+    this.writer.set(writer);
+  }
+
+  @Override
+  public void interruptThread() {
+    Thread thread = writer.get();
+    if (thread != null && thread != Thread.currentThread()
+        && thread.isAlive()) {
+      thread.interrupt();
+    }
+  }
+
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  /**
+   * Attempt to set the writer to a new value.
+   */
+  @Override // ReplicaInPipeline
+  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
+    return writer.compareAndSet(prevWriter, newWriter);
+  }
+
+  /**
+   * Interrupt the writing thread and wait until it dies.
+   * @throws IOException the waiting is interrupted
+   */
+  @Override // ReplicaInPipeline
+  public void stopWriter(long xceiverStopTimeout) throws IOException {
+    while (true) {
+      Thread thread = writer.get();
+      if ((thread == null) || (thread == Thread.currentThread()) ||
+          (!thread.isAlive())) {
+        if (writer.compareAndSet(thread, null)) {
+          return; // Done
+        }
+        // The writer changed.  Go back to the start of the loop and attempt to
+        // stop the new writer.
+        continue;
+      }
+      thread.interrupt();
+      try {
+        thread.join(xceiverStopTimeout);
+        if (thread.isAlive()) {
+          // Our thread join timed out.
+          final String msg = "Join on writer thread " + thread + " timed out";
+          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
+          throw new IOException(msg);
+        }
+      } catch (InterruptedException e) {
+        throw new IOException("Waiting for writer thread is interrupted.");
+      }
+    }
+  }
+
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override // ReplicaInPipeline
+  public ReplicaOutputStreams createStreams(boolean isCreate,
+      DataChecksum requestedChecksum) throws IOException {
+    File blockFile = getBlockFile();
+    File metaFile = getMetaFile();
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
+                         " of size " + blockFile.length());
+      DataNode.LOG.debug("writeTo metafile is " + metaFile +
+                         " of size " + metaFile.length());
+    }
+    long blockDiskSize = 0L;
+    long crcDiskSize = 0L;
+
+    // the checksum that should actually be used -- this
+    // may differ from requestedChecksum for appends.
+    final DataChecksum checksum;
+
+    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+
+    if (!isCreate) {
+      // For append or recovery, we must enforce the existing checksum.
+      // Also, verify that the file has correct lengths, etc.
+      boolean checkedMeta = false;
+      try {
+        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
+        checksum = header.getChecksum();
+
+        if (checksum.getBytesPerChecksum() !=
+            requestedChecksum.getBytesPerChecksum()) {
+          throw new IOException("Client requested checksum " +
+              requestedChecksum + " when appending to an existing block " +
+              "with different chunk size: " + checksum);
+        }
+
+        int bytesPerChunk = checksum.getBytesPerChecksum();
+        int checksumSize = checksum.getChecksumSize();
+
+        blockDiskSize = bytesOnDisk;
+        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
+          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
+        if (blockDiskSize > 0 &&
+            (blockDiskSize > blockFile.length() ||
+               crcDiskSize>metaFile.length())) {
+          throw new IOException("Corrupted block: " + this);
+        }
+        checkedMeta = true;
+      } finally {
+        if (!checkedMeta) {
+          // clean up in case of exceptions.
+          IOUtils.closeStream(metaRAF);
+        }
+      }
+    } else {
+      // for create, we can use the requested checksum
+      checksum = requestedChecksum;
+    }
+
+    FileOutputStream blockOut = null;
+    FileOutputStream crcOut = null;
+    try {
+      blockOut = new FileOutputStream(
+          new RandomAccessFile(blockFile, "rw").getFD());
+      crcOut = new FileOutputStream(metaRAF.getFD());
+      if (!isCreate) {
+        blockOut.getChannel().position(blockDiskSize);
+        crcOut.getChannel().position(crcDiskSize);
+      }
+      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
+          getVolume().isTransientStorage());
+    } catch (IOException e) {
+      IOUtils.closeStream(blockOut);
+      IOUtils.closeStream(metaRAF);
+      throw e;
+    }
+  }
+
+  @Override
+  public OutputStream createRestartMetaStream() throws IOException {
+    File blockFile = getBlockFile();
+    File restartMeta = new File(blockFile.getParent()  +
+        File.pathSeparator + "." + blockFile.getName() + ".restart");
+    if (restartMeta.exists() && !restartMeta.delete()) {
+      DataNode.LOG.warn("Failed to delete restart meta file: " +
+          restartMeta.getPath());
+    }
+    return new FileOutputStream(restartMeta);
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + "\n  bytesAcked=" + bytesAcked
+        + "\n  bytesOnDisk=" + bytesOnDisk;
+  }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support setRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo(){
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
+
+  public void moveReplicaFrom(ReplicaInfo oldReplicaInfo, File newBlkFile)
+      throws IOException {
+
+    if (!(oldReplicaInfo instanceof LocalReplica)) {
+      throw new IOException("The source replica with blk id "
+          + oldReplicaInfo.getBlockId()
+          + " should be derived from LocalReplica");
+    }
+
+    LocalReplica localReplica = (LocalReplica) oldReplicaInfo;
+
+    File oldmeta = localReplica.getMetaFile();
+    File newmeta = getMetaFile();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + oldmeta + " to " + newmeta);
+    }
+    try {
+      NativeIO.renameTo(oldmeta, newmeta);
+    } catch (IOException e) {
+      throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
+                            " Unable to move meta file  " + oldmeta +
+                            " to rbw dir " + newmeta, e);
+    }
+
+    File blkfile = localReplica.getBlockFile();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+          + ", file length=" + blkfile.length());
+    }
+    try {
+      NativeIO.renameTo(blkfile, newBlkFile);
+    } catch (IOException e) {
+      try {
+        NativeIO.renameTo(newmeta, oldmeta);
+      } catch (IOException ex) {
+        LOG.warn("Cannot move meta file " + newmeta +
+            "back to the finalized directory " + oldmeta, ex);
+      }
+      throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
+                              " Unable to move block file " + blkfile +
+                              " to rbw dir " + newBlkFile, e);
+    }
+  }
+
+  @Override // ReplicaInPipeline
+  public ReplicaInfo getReplicaInfo() {
+    return this;
+  }
+}

+ 8 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java

@@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
  * Those are the replicas that
  * are created in a pipeline initiated by a dfs client.
  */
-public class ReplicaBeingWritten extends ReplicaInPipeline {
+public class ReplicaBeingWritten extends LocalReplicaInPipeline {
   /**
-   * Constructor for a zero length replica
+   * Constructor for a zero length replica.
    * @param blockId block id
    * @param genStamp replica generation stamp
    * @param vol volume where replica is located
@@ -37,25 +37,25 @@ public class ReplicaBeingWritten extends ReplicaInPipeline {
    * @param bytesToReserve disk space to reserve for this replica, based on
    *                       the estimated maximum block length.
    */
-  public ReplicaBeingWritten(long blockId, long genStamp, 
+  public ReplicaBeingWritten(long blockId, long genStamp,
         FsVolumeSpi vol, File dir, long bytesToReserve) {
     super(blockId, genStamp, vol, dir, bytesToReserve);
   }
-  
+
   /**
-   * Constructor
+   * Constructor.
    * @param block a block
    * @param vol volume where replica is located
    * @param dir directory path where block and meta files are located
    * @param writer a thread that is writing to this replica
    */
-  public ReplicaBeingWritten(Block block, 
+  public ReplicaBeingWritten(Block block,
       FsVolumeSpi vol, File dir, Thread writer) {
-    super( block, vol, dir, writer);
+    super(block, vol, dir, writer);
   }
 
   /**
-   * Constructor
+   * Constructor.
    * @param blockId block id
    * @param len replica length
    * @param genStamp replica generation stamp

+ 252 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java

@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+
+/**
+ * This class is to be used as a builder for {@link ReplicaInfo} objects.
+ * The state of the replica is used to determine which object is instantiated.
+ */
+public class ReplicaBuilder {
+
+  private ReplicaState state;
+  private long blockId;
+  private long genStamp;
+  private long length;
+  private FsVolumeSpi volume;
+  private File directoryUsed;
+  private long bytesToReserve;
+  private Thread writer;
+  private long recoveryId;
+  private Block block;
+
+  private ReplicaInfo fromReplica;
+
+  public ReplicaBuilder(ReplicaState state) {
+    volume = null;
+    writer = null;
+    block = null;
+    length = -1;
+    this.state = state;
+  }
+
+  public ReplicaBuilder setState(ReplicaState state) {
+    this.state = state;
+    return this;
+  }
+
+  public ReplicaBuilder setBlockId(long blockId) {
+    this.blockId = blockId;
+    return this;
+  }
+
+  public ReplicaBuilder setGenerationStamp(long genStamp) {
+    this.genStamp = genStamp;
+    return this;
+  }
+
+  public ReplicaBuilder setLength(long length) {
+    this.length = length;
+    return this;
+  }
+
+  public ReplicaBuilder setFsVolume(FsVolumeSpi volume) {
+    this.volume = volume;
+    return this;
+  }
+
+  public ReplicaBuilder setDirectoryToUse(File dir) {
+    this.directoryUsed = dir;
+    return this;
+  }
+
+  public ReplicaBuilder setBytesToReserve(long bytesToReserve) {
+    this.bytesToReserve = bytesToReserve;
+    return this;
+  }
+
+  public ReplicaBuilder setWriterThread(Thread writer) {
+    this.writer = writer;
+    return this;
+  }
+
+  public ReplicaBuilder from(ReplicaInfo fromReplica) {
+    this.fromReplica = fromReplica;
+    return this;
+  }
+
+  public ReplicaBuilder setRecoveryId(long recoveryId) {
+    this.recoveryId = recoveryId;
+    return this;
+  }
+
+  public ReplicaBuilder setBlock(Block block) {
+    this.block = block;
+    return this;
+  }
+
+  public LocalReplicaInPipeline buildLocalReplicaInPipeline()
+      throws IllegalArgumentException {
+    LocalReplicaInPipeline info = null;
+    switch(state) {
+    case RBW:
+      info = buildRBW();
+      break;
+    case TEMPORARY:
+      info = buildTemporaryReplica();
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown replica state " + state);
+    }
+    return info;
+  }
+
+  private LocalReplicaInPipeline buildRBW() throws IllegalArgumentException {
+    if (null != fromReplica && fromReplica.getState() == ReplicaState.RBW) {
+      return new ReplicaBeingWritten((ReplicaBeingWritten) fromReplica);
+    } else if (null != fromReplica) {
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        if (null == writer) {
+          throw new IllegalArgumentException("A valid writer is "
+              + "required for constructing a RBW from block "
+              + block.getBlockId());
+        }
+        return new ReplicaBeingWritten(block, volume, directoryUsed, writer);
+      } else {
+        if (length != -1) {
+          return new ReplicaBeingWritten(blockId, length, genStamp,
+              volume, directoryUsed, writer, bytesToReserve);
+        } else {
+          return new ReplicaBeingWritten(blockId, genStamp, volume,
+              directoryUsed, bytesToReserve);
+        }
+      }
+    }
+  }
+
+  private LocalReplicaInPipeline buildTemporaryReplica()
+      throws IllegalArgumentException {
+    if (null != fromReplica &&
+        fromReplica.getState() == ReplicaState.TEMPORARY) {
+      return new LocalReplicaInPipeline((LocalReplicaInPipeline) fromReplica);
+    } else if (null != fromReplica) {
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        if (null == writer) {
+          throw new IllegalArgumentException("A valid writer is "
+              + "required for constructing a Replica from block "
+              + block.getBlockId());
+        }
+        return new LocalReplicaInPipeline(block, volume, directoryUsed,
+            writer);
+      } else {
+        if (length != -1) {
+          return new LocalReplicaInPipeline(blockId, length, genStamp,
+              volume, directoryUsed, writer, bytesToReserve);
+        } else {
+          return new LocalReplicaInPipeline(blockId, genStamp, volume,
+              directoryUsed, bytesToReserve);
+        }
+      }
+    }
+  }
+
+  private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
+    if (null != fromReplica &&
+        fromReplica.getState() == ReplicaState.FINALIZED) {
+      return new FinalizedReplica((FinalizedReplica)fromReplica);
+    } else if (null != this.fromReplica) {
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        return new FinalizedReplica(block, volume, directoryUsed);
+      } else {
+        return new FinalizedReplica(blockId, length, genStamp, volume,
+            directoryUsed);
+      }
+    }
+  }
+
+  private ReplicaInfo buildRWR() throws IllegalArgumentException {
+
+    if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
+      return new ReplicaWaitingToBeRecovered(
+          (ReplicaWaitingToBeRecovered) fromReplica);
+    } else if (null != fromReplica){
+      throw new IllegalArgumentException("Incompatible fromReplica "
+          + "state: " + fromReplica.getState());
+    } else {
+      if (null != block) {
+        return new ReplicaWaitingToBeRecovered(block, volume, directoryUsed);
+      } else {
+        return new ReplicaWaitingToBeRecovered(blockId, length, genStamp,
+            volume, directoryUsed);
+      }
+    }
+  }
+
+  private ReplicaInfo buildRUR() throws IllegalArgumentException {
+    if (null == fromReplica) {
+      throw new IllegalArgumentException(
+          "Missing a valid replica to recover from");
+    }
+    if (null != writer || null != block) {
+      throw new IllegalArgumentException("Invalid state for "
+          + "recovering from replica with blk id "
+          + fromReplica.getBlockId());
+    }
+    if (fromReplica.getState() == ReplicaState.RUR) {
+      return new ReplicaUnderRecovery((ReplicaUnderRecovery) fromReplica);
+    } else {
+      return new ReplicaUnderRecovery(fromReplica, recoveryId);
+    }
+  }
+
+  public ReplicaInfo build() throws IllegalArgumentException {
+    ReplicaInfo info = null;
+    switch(this.state) {
+    case FINALIZED:
+      info = buildFinalizedReplica();
+      break;
+    case RWR:
+      info = buildRWR();
+      break;
+    case RUR:
+      info = buildRUR();
+      break;
+    case RBW:
+    case TEMPORARY:
+      info = buildLocalReplicaInPipeline();
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown replica state " + state);
+    }
+    return info;
+  }
+}

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaHandler.java

@@ -27,11 +27,11 @@ import java.io.IOException;
  * the fs volume where this replica is located.
  */
 public class ReplicaHandler implements Closeable {
-  private final ReplicaInPipelineInterface replica;
+  private final ReplicaInPipeline replica;
   private final FsVolumeReference volumeReference;
 
   public ReplicaHandler(
-      ReplicaInPipelineInterface replica, FsVolumeReference reference) {
+      ReplicaInPipeline replica, FsVolumeReference reference) {
     this.replica = replica;
     this.volumeReference = reference;
   }
@@ -43,7 +43,7 @@ public class ReplicaHandler implements Closeable {
     }
   }
 
-  public ReplicaInPipelineInterface getReplica() {
+  public ReplicaInPipeline getReplica() {
     return replica;
   }
 }

+ 51 - 273
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java

@@ -17,313 +17,91 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.RandomAccessFile;
-import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.StringUtils;
 
 /** 
- * This class defines a replica in a pipeline, which
- * includes a persistent replica being written to by a dfs client or
- * a temporary replica being replicated by a source datanode or
- * being copied for the balancing purpose.
- * 
- * The base class implements a temporary replica
+ * This defines the interface of a replica in Pipeline that's being written to
  */
-public class ReplicaInPipeline extends ReplicaInfo
-                        implements ReplicaInPipelineInterface {
-  private long bytesAcked;
-  private long bytesOnDisk;
-  private byte[] lastChecksum;  
-  private AtomicReference<Thread> writer = new AtomicReference<Thread>();
-
+public interface ReplicaInPipeline extends Replica {
   /**
-   * Bytes reserved for this replica on the containing volume.
-   * Based off difference between the estimated maximum block length and
-   * the bytes already written to this block.
+   * Set the number of bytes received
+   * @param bytesReceived number of bytes received
    */
-  private long bytesReserved;
-  private final long originalBytesReserved;
+  void setNumBytes(long bytesReceived);
 
   /**
-   * Constructor for a zero length replica
-   * @param blockId block id
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   * @param bytesToReserve disk space to reserve for this replica, based on
-   *                       the estimated maximum block length.
+   * Get the number of bytes acked
+   * @return the number of bytes acked
    */
-  public ReplicaInPipeline(long blockId, long genStamp, 
-        FsVolumeSpi vol, File dir, long bytesToReserve) {
-    this(blockId, 0L, genStamp, vol, dir, Thread.currentThread(), bytesToReserve);
-  }
+  long getBytesAcked();
 
   /**
-   * Constructor
-   * @param block a block
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   * @param writer a thread that is writing to this replica
+   * Set the number bytes that have acked
+   * @param bytesAcked number bytes acked
    */
-  ReplicaInPipeline(Block block, 
-      FsVolumeSpi vol, File dir, Thread writer) {
-    this( block.getBlockId(), block.getNumBytes(), block.getGenerationStamp(),
-        vol, dir, writer, 0L);
-  }
+  void setBytesAcked(long bytesAcked);
 
   /**
-   * Constructor
-   * @param blockId block id
-   * @param len replica length
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   * @param writer a thread that is writing to this replica
-   * @param bytesToReserve disk space to reserve for this replica, based on
-   *                       the estimated maximum block length.
+   * Release any disk space reserved for this replica.
    */
-  ReplicaInPipeline(long blockId, long len, long genStamp,
-      FsVolumeSpi vol, File dir, Thread writer, long bytesToReserve) {
-    super( blockId, len, genStamp, vol, dir);
-    this.bytesAcked = len;
-    this.bytesOnDisk = len;
-    this.writer.set(writer);
-    this.bytesReserved = bytesToReserve;
-    this.originalBytesReserved = bytesToReserve;
-  }
+  public void releaseAllBytesReserved();
 
   /**
-   * Copy constructor.
-   * @param from where to copy from
+   * store the checksum for the last chunk along with the data length
+   * @param dataLength number of bytes on disk
+   * @param lastChecksum - checksum bytes for the last chunk
    */
-  public ReplicaInPipeline(ReplicaInPipeline from) {
-    super(from);
-    this.bytesAcked = from.getBytesAcked();
-    this.bytesOnDisk = from.getBytesOnDisk();
-    this.writer.set(from.writer.get());
-    this.bytesReserved = from.bytesReserved;
-    this.originalBytesReserved = from.originalBytesReserved;
-  }
-
-  @Override
-  public long getVisibleLength() {
-    return -1;
-  }
-  
-  @Override  //ReplicaInfo
-  public ReplicaState getState() {
-    return ReplicaState.TEMPORARY;
-  }
+  public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
   
-  @Override // ReplicaInPipelineInterface
-  public long getBytesAcked() {
-    return bytesAcked;
-  }
+  /**
+   * gets the last chunk checksum and the length of the block corresponding
+   * to that checksum
+   */
+  public ChunkChecksum getLastChecksumAndDataLen();
   
-  @Override // ReplicaInPipelineInterface
-  public void setBytesAcked(long bytesAcked) {
-    long newBytesAcked = bytesAcked - this.bytesAcked;
-    this.bytesAcked = bytesAcked;
+  /**
+   * Create output streams for writing to this replica,
+   * one for block file and one for CRC file
+   *
+   * @param isCreate if it is for creation
+   * @param requestedChecksum the checksum the writer would prefer to use
+   * @return output streams for writing
+   * @throws IOException if any error occurs
+   */
+  public ReplicaOutputStreams createStreams(boolean isCreate,
+      DataChecksum requestedChecksum) throws IOException;
 
-    // Once bytes are ACK'ed we can release equivalent space from the
-    // volume's reservedForRbw count. We could have released it as soon
-    // as the write-to-disk completed but that would be inefficient.
-    getVolume().releaseReservedSpace(newBytesAcked);
-    bytesReserved -= newBytesAcked;
-  }
+  /**
+   * Create an output stream to write restart metadata in case of datanode
+   * shutting down for quick restart.
+   *
+   * @return output stream for writing.
+   * @throws IOException if any error occurs
+   */
+  public OutputStream createRestartMetaStream() throws IOException;
   
-  @Override // ReplicaInPipelineInterface
-  public long getBytesOnDisk() {
-    return bytesOnDisk;
-  }
-
-  @Override
-  public long getBytesReserved() {
-    return bytesReserved;
-  }
+  ReplicaInfo getReplicaInfo();
   
-  @Override
-  public long getOriginalBytesReserved() {
-    return originalBytesReserved;
-  }
-
-  @Override
-  public void releaseAllBytesReserved() {  // ReplicaInPipelineInterface
-    getVolume().releaseReservedSpace(bytesReserved);
-    getVolume().releaseLockedMemory(bytesReserved);
-    bytesReserved = 0;
-  }
-
-  @Override // ReplicaInPipelineInterface
-  public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
-    this.bytesOnDisk = dataLength;
-    this.lastChecksum = lastChecksum;
-  }
+  /**
+   * Set the thread that is writing to this replica
+   * @param writer a thread writing to this replica
+   */
+  void setWriter(Thread writer);
   
-  @Override // ReplicaInPipelineInterface
-  public synchronized ChunkChecksum getLastChecksumAndDataLen() {
-    return new ChunkChecksum(getBytesOnDisk(), lastChecksum);
-  }
-
-  public void interruptThread() {
-    Thread thread = writer.get();
-    if (thread != null && thread != Thread.currentThread() 
-        && thread.isAlive()) {
-      thread.interrupt();
-    }
-  }
-
-  @Override  // Object
-  public boolean equals(Object o) {
-    return super.equals(o);
-  }
+  void interruptThread();
   
   /**
    * Attempt to set the writer to a new value.
    */
-  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
-    return writer.compareAndSet(prevWriter, newWriter);
-  }
+  boolean attemptToSetWriter(Thread prevWriter, Thread newWriter);
 
   /**
-   * Interrupt the writing thread and wait until it dies
+   * Interrupt the writing thread and wait until it dies.
    * @throws IOException the waiting is interrupted
    */
-  public void stopWriter(long xceiverStopTimeout) throws IOException {
-    while (true) {
-      Thread thread = writer.get();
-      if ((thread == null) || (thread == Thread.currentThread()) ||
-          (!thread.isAlive())) {
-        if (writer.compareAndSet(thread, null) == true) {
-          return; // Done
-        }
-        // The writer changed.  Go back to the start of the loop and attempt to
-        // stop the new writer.
-        continue;
-      }
-      thread.interrupt();
-      try {
-        thread.join(xceiverStopTimeout);
-        if (thread.isAlive()) {
-          // Our thread join timed out.
-          final String msg = "Join on writer thread " + thread + " timed out";
-          DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread));
-          throw new IOException(msg);
-        }
-      } catch (InterruptedException e) {
-        throw new IOException("Waiting for writer thread is interrupted.");
-      }
-    }
-  }
-
-  @Override  // Object
-  public int hashCode() {
-    return super.hashCode();
-  }
-  
-  @Override // ReplicaInPipelineInterface
-  public ReplicaOutputStreams createStreams(boolean isCreate, 
-      DataChecksum requestedChecksum) throws IOException {
-    File blockFile = getBlockFile();
-    File metaFile = getMetaFile();
-    if (DataNode.LOG.isDebugEnabled()) {
-      DataNode.LOG.debug("writeTo blockfile is " + blockFile +
-                         " of size " + blockFile.length());
-      DataNode.LOG.debug("writeTo metafile is " + metaFile +
-                         " of size " + metaFile.length());
-    }
-    long blockDiskSize = 0L;
-    long crcDiskSize = 0L;
-    
-    // the checksum that should actually be used -- this
-    // may differ from requestedChecksum for appends.
-    final DataChecksum checksum;
-    
-    RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
-    
-    if (!isCreate) {
-      // For append or recovery, we must enforce the existing checksum.
-      // Also, verify that the file has correct lengths, etc.
-      boolean checkedMeta = false;
-      try {
-        BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
-        checksum = header.getChecksum();
-        
-        if (checksum.getBytesPerChecksum() !=
-            requestedChecksum.getBytesPerChecksum()) {
-          throw new IOException("Client requested checksum " +
-              requestedChecksum + " when appending to an existing block " +
-              "with different chunk size: " + checksum);
-        }
-        
-        int bytesPerChunk = checksum.getBytesPerChecksum();
-        int checksumSize = checksum.getChecksumSize();
-        
-        blockDiskSize = bytesOnDisk;
-        crcDiskSize = BlockMetadataHeader.getHeaderSize() +
-          (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
-        if (blockDiskSize>0 && 
-            (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
-          throw new IOException("Corrupted block: " + this);
-        }
-        checkedMeta = true;
-      } finally {
-        if (!checkedMeta) {
-          // clean up in case of exceptions.
-          IOUtils.closeStream(metaRAF);
-        }
-      }
-    } else {
-      // for create, we can use the requested checksum
-      checksum = requestedChecksum;
-    }
-    
-    FileOutputStream blockOut = null;
-    FileOutputStream crcOut = null;
-    try {
-      blockOut = new FileOutputStream(
-          new RandomAccessFile( blockFile, "rw" ).getFD() );
-      crcOut = new FileOutputStream(metaRAF.getFD() );
-      if (!isCreate) {
-        blockOut.getChannel().position(blockDiskSize);
-        crcOut.getChannel().position(crcDiskSize);
-      }
-      return new ReplicaOutputStreams(blockOut, crcOut, checksum,
-          getVolume().isTransientStorage());
-    } catch (IOException e) {
-      IOUtils.closeStream(blockOut);
-      IOUtils.closeStream(metaRAF);
-      throw e;
-    }
-  }
-
-  @Override
-  public OutputStream createRestartMetaStream() throws IOException {
-    File blockFile = getBlockFile();
-    File restartMeta = new File(blockFile.getParent()  +
-        File.pathSeparator + "." + blockFile.getName() + ".restart");
-    if (restartMeta.exists() && !restartMeta.delete()) {
-      DataNode.LOG.warn("Failed to delete restart meta file: " +
-          restartMeta.getPath());
-    }
-    return new FileOutputStream(restartMeta);
-  }
-
-  @Override
-  public String toString() {
-    return super.toString()
-        + "\n  bytesAcked=" + bytesAcked
-        + "\n  bytesOnDisk=" + bytesOnDisk;
-  }
+  void stopWriter(long xceiverStopTimeout) throws IOException;
 }

+ 0 - 86
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java

@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.util.DataChecksum;
-
-/** 
- * This defines the interface of a replica in Pipeline that's being written to
- */
-public interface ReplicaInPipelineInterface extends Replica {
-  /**
-   * Set the number of bytes received
-   * @param bytesReceived number of bytes received
-   */
-  void setNumBytes(long bytesReceived);
-  
-  /**
-   * Get the number of bytes acked
-   * @return the number of bytes acked
-   */
-  long getBytesAcked();
-  
-  /**
-   * Set the number bytes that have acked
-   * @param bytesAcked number bytes acked
-   */
-  void setBytesAcked(long bytesAcked);
-  
-  /**
-   * Release any disk space reserved for this replica.
-   */
-  public void releaseAllBytesReserved();
-
-  /**
-   * store the checksum for the last chunk along with the data length
-   * @param dataLength number of bytes on disk
-   * @param lastChecksum - checksum bytes for the last chunk
-   */
-  public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum);
-  
-  /**
-   * gets the last chunk checksum and the length of the block corresponding
-   * to that checksum
-   */
-  public ChunkChecksum getLastChecksumAndDataLen();
-  
-  /**
-   * Create output streams for writing to this replica, 
-   * one for block file and one for CRC file
-   * 
-   * @param isCreate if it is for creation
-   * @param requestedChecksum the checksum the writer would prefer to use
-   * @return output streams for writing
-   * @throws IOException if any error occurs
-   */
-  public ReplicaOutputStreams createStreams(boolean isCreate,
-      DataChecksum requestedChecksum) throws IOException;
-
-  /**
-   * Create an output stream to write restart metadata in case of datanode
-   * shutting down for quick restart.
-   *
-   * @return output stream for writing.
-   * @throws IOException if any error occurs
-   */
-  public OutputStream createRestartMetaStream() throws IOException;
-}

+ 169 - 201
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java

@@ -17,23 +17,20 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.util.LightWeightResizableGSet;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class is used by datanodes to maintain meta data of its replicas.
  * It provides a general interface for meta information of a replica.
@@ -42,81 +39,26 @@ import com.google.common.annotations.VisibleForTesting;
 abstract public class ReplicaInfo extends Block
     implements Replica, LightWeightResizableGSet.LinkedElement {
 
-  /** For implementing {@link LightWeightResizableGSet.LinkedElement} interface */
+  /** For implementing {@link LightWeightResizableGSet.LinkedElement}. */
   private LightWeightResizableGSet.LinkedElement next;
 
-  /** volume where the replica belongs */
+  /** volume where the replica belongs. */
   private FsVolumeSpi volume;
-  
-  /** directory where block & meta files belong */
-  
-  /**
-   * Base directory containing numerically-identified sub directories and
-   * possibly blocks.
-   */
-  private File baseDir;
-  
-  /**
-   * Whether or not this replica's parent directory includes subdirs, in which
-   * case we can generate them based on the replica's block ID
-   */
-  private boolean hasSubdirs;
-  
-  private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
 
   /**
-   * Constructor
-   * @param block a block
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   */
-  ReplicaInfo(Block block, FsVolumeSpi vol, File dir) {
-    this(block.getBlockId(), block.getNumBytes(), 
-        block.getGenerationStamp(), vol, dir);
-  }
-  
-  /**
-   * Constructor
-   * @param blockId block id
-   * @param len replica length
-   * @param genStamp replica generation stamp
-   * @param vol volume where replica is located
-   * @param dir directory path where block and meta files are located
-   */
-  ReplicaInfo(long blockId, long len, long genStamp,
-      FsVolumeSpi vol, File dir) {
+  * Constructor
+  * @param vol volume where replica is located
+  * @param blockId block id
+  * @param len replica length
+  * @param genStamp replica generation stamp
+  */
+  ReplicaInfo(FsVolumeSpi vol, long blockId, long len, long genStamp) {
     super(blockId, len, genStamp);
     this.volume = vol;
-    setDirInternal(dir);
-  }
-
-  /**
-   * Copy constructor.
-   * @param from where to copy from
-   */
-  ReplicaInfo(ReplicaInfo from) {
-    this(from, from.getVolume(), from.getDir());
-  }
-  
-  /**
-   * Get the full path of this replica's data file
-   * @return the full path of this replica's data file
-   */
-  public File getBlockFile() {
-    return new File(getDir(), getBlockName());
   }
   
   /**
-   * Get the full path of this replica's meta file
-   * @return the full path of this replica's meta file
-   */
-  public File getMetaFile() {
-    return new File(getDir(),
-        DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
-  }
-  
-  /**
-   * Get the volume where this replica is located on disk
+   * Get the volume where this replica is located on disk.
    * @return the volume where this replica is located on disk
    */
   public FsVolumeSpi getVolume() {
@@ -124,7 +66,7 @@ abstract public class ReplicaInfo extends Block
   }
   
   /**
-   * Set the volume where this replica is located on disk
+   * Set the volume where this replica is located on disk.
    */
   void setVolume(FsVolumeSpi vol) {
     this.volume = vol;
@@ -137,156 +79,182 @@ abstract public class ReplicaInfo extends Block
   public String getStorageUuid() {
     return volume.getStorageID();
   }
-  
+
   /**
-   * Return the parent directory path where this replica is located
-   * @return the parent directory path where this replica is located
+   * Number of bytes reserved for this replica on disk.
    */
-  File getDir() {
-    return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
-        getBlockId()) : baseDir;
+  public long getBytesReserved() {
+    return 0;
   }
 
   /**
-   * Set the parent directory where this replica is located
-   * @param dir the parent directory where the replica is located
+   * Get the {@code URI} for where the data of this replica is stored.
+   * @return {@code URI} for the location of replica data.
    */
-  public void setDir(File dir) {
-    setDirInternal(dir);
-  }
+  abstract public URI getBlockURI();
 
-  private void setDirInternal(File dir) {
-    if (dir == null) {
-      baseDir = null;
-      return;
-    }
-
-    ReplicaDirInfo dirInfo = parseBaseDir(dir);
-    this.hasSubdirs = dirInfo.hasSubidrs;
-    
-    synchronized (internedBaseDirs) {
-      if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
-        // Create a new String path of this file and make a brand new File object
-        // to guarantee we drop the reference to the underlying char[] storage.
-        File baseDir = new File(dirInfo.baseDirPath);
-        internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
-      }
-      this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
-    }
-  }
+  /**
+   * Returns an {@link InputStream} to the replica's data.
+   * @param seekOffset the offset at which the read is started from.
+   * @return the {@link InputStream} to read the replica data.
+   * @throws IOException if an error occurs in opening a stream to the data.
+   */
+  abstract public InputStream getDataInputStream(long seekOffset)
+      throws IOException;
 
-  @VisibleForTesting
-  public static class ReplicaDirInfo {
-    public String baseDirPath;
-    public boolean hasSubidrs;
+  /**
+   * Returns an {@link OutputStream} to the replica's data.
+   * @param append indicates if the block should be opened for append.
+   * @return the {@link OutputStream} to write to the replica.
+   * @throws IOException if an error occurs in creating an {@link OutputStream}.
+   */
+  abstract public OutputStream getDataOutputStream(boolean append)
+      throws IOException;
 
-    public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
-      this.baseDirPath = baseDirPath;
-      this.hasSubidrs = hasSubidrs;
-    }
-  }
-  
-  @VisibleForTesting
-  public static ReplicaDirInfo parseBaseDir(File dir) {
-    
-    File currentDir = dir;
-    boolean hasSubdirs = false;
-    while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
-      hasSubdirs = true;
-      currentDir = currentDir.getParentFile();
-    }
-    
-    return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
-  }
+  /**
+   * @return true if the replica's data exists.
+   */
+  abstract public boolean blockDataExists();
 
   /**
-   * Number of bytes reserved for this replica on disk.
+   * Used to deletes the replica's block data.
+   *
+   * @return true if the replica's data is successfully deleted.
    */
-  public long getBytesReserved() {
-    return 0;
-  }
+  abstract public boolean deleteBlockData();
 
   /**
-   * Number of bytes originally reserved for this replica. The actual
-   * reservation is adjusted as data is written to disk.
+   * @return the length of the block on storage.
+   */
+  abstract public long getBlockDataLength();
+
+  /**
+   * Get the {@code URI} for where the metadata of this replica is stored.
    *
-   * @return the number of bytes originally reserved for this replica.
+   * @return {@code URI} for the location of replica metadata.
    */
-  public long getOriginalBytesReserved() {
-    return 0;
-  }
+  abstract public URI getMetadataURI();
 
   /**
-   * Copy specified file into a temporary file. Then rename the
-   * temporary file to the original name. This will cause any
-   * hardlinks to the original file to be removed. The temporary
-   * files are created in the same directory. The temporary files will
-   * be recovered (especially on Windows) on datanode restart.
+   * Returns an {@link InputStream} to the replica's metadata.
+   * @param offset the offset at which the read is started from.
+   * @return the {@link LengthInputStream} to read the replica metadata.
+   * @throws IOException
    */
-  private void breakHardlinks(File file, Block b) throws IOException {
-    File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
-    try {
-      FileInputStream in = new FileInputStream(file);
-      try {
-        FileOutputStream out = new FileOutputStream(tmpFile);
-        try {
-          IOUtils.copyBytes(in, out, 16 * 1024);
-        } finally {
-          out.close();
-        }
-      } finally {
-        in.close();
-      }
-      if (file.length() != tmpFile.length()) {
-        throw new IOException("Copy of file " + file + " size " + file.length()+
-                              " into file " + tmpFile +
-                              " resulted in a size of " + tmpFile.length());
-      }
-      FileUtil.replaceFile(tmpFile, file);
-    } catch (IOException e) {
-      boolean done = tmpFile.delete();
-      if (!done) {
-        DataNode.LOG.info("detachFile failed to delete temporary file " +
-                          tmpFile);
-      }
-      throw e;
-    }
-  }
+  abstract public LengthInputStream getMetadataInputStream(long offset)
+      throws IOException;
+
+  /**
+   * Returns an {@link OutputStream} to the replica's metadata.
+   * @param append indicates if the block metadata should be opened for append.
+   * @return the {@link OutputStream} to write to the replica's metadata.
+   * @throws IOException if an error occurs in creating an {@link OutputStream}.
+   */
+  abstract public OutputStream getMetadataOutputStream(boolean append)
+      throws IOException;
+
+  /**
+   * @return true if the replica's metadata exists.
+   */
+  abstract public boolean metadataExists();
 
   /**
-   * This function "breaks hardlinks" to the current replica file.
+   * Used to deletes the replica's metadata.
    *
-   * When doing a DataNode upgrade, we create a bunch of hardlinks to each block
-   * file.  This cleverly ensures that both the old and the new storage
-   * directories can contain the same block file, without using additional space
-   * for the data.
+   * @return true if the replica's metadata is successfully deleted.
+   */
+  abstract public boolean deleteMetadata();
+
+  /**
+   * @return the length of the metadata on storage.
+   */
+  abstract public long getMetadataLength();
+
+  /**
+   * Rename the metadata {@link URI} to that referenced by {@code destURI}.
    *
-   * However, when we want to append to the replica file, we need to "break" the
-   * hardlink to ensure that the old snapshot continues to contain the old data
-   * length.  If we failed to do that, we could roll back to the previous/
-   * directory during a downgrade, and find that the block contents were longer
-   * than they were at the time of upgrade.
+   * @param destURI the target {@link URI}.
+   * @return true if the rename is successful.
+   * @throws IOException if an exception occurs in the rename.
+   */
+  abstract public boolean renameMeta(URI destURI) throws IOException;
+
+  /**
+   * Rename the data {@link URI} to that referenced by {@code destURI}.
    *
-   * @return true only if data was copied.
+   * @param destURI the target {@link URI}.
+   * @return true if the rename is successful.
+   * @throws IOException if an exception occurs in the rename.
+   */
+  abstract public boolean renameData(URI destURI) throws IOException;
+
+  /**
+   * Update this replica with the {@link StorageLocation} found.
+   * @param replicaLocation the {@link StorageLocation} found for this replica.
+   */
+  abstract public void updateWithReplica(StorageLocation replicaLocation);
+
+  /**
+   * Check whether the block was pinned.
+   * @param localFS the local filesystem to use.
+   * @return true if the block is pinned.
    * @throws IOException
    */
-  public boolean breakHardLinksIfNeeded() throws IOException {
-    File file = getBlockFile();
-    if (file == null || getVolume() == null) {
-      throw new IOException("detachBlock:Block not found. " + this);
-    }
-    File meta = getMetaFile();
-
-    int linkCount = HardLink.getLinkCount(file);
-    if (linkCount > 1) {
-      DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
-          "block " + this);
-      breakHardlinks(file, this);
-    }
-    if (HardLink.getLinkCount(meta) > 1) {
-      breakHardlinks(meta, this);
-    }
-    return true;
+  abstract public boolean getPinning(LocalFileSystem localFS)
+      throws IOException;
+
+  /**
+   * Set a block to be pinned on this datanode so that it cannot be moved
+   * by Balancer/Mover.
+   *
+   * @param localFS the local filesystem to use.
+   * @throws IOException if there is an exception in the pinning.
+   */
+  abstract public void setPinning(LocalFileSystem localFS) throws IOException;
+
+  /**
+   * Bump a replica's generation stamp to a new one.
+   * Its on-disk meta file name is renamed to be the new one too.
+   *
+   * @param newGS new generation stamp
+   * @throws IOException if the change fails
+   */
+  abstract public void bumpReplicaGS(long newGS) throws IOException;
+
+  abstract public ReplicaInfo getOriginalReplica();
+
+  /**
+   * Get the recovery id.
+   * @return the generation stamp that the replica will be bumped to
+   */
+  abstract public long getRecoveryID();
+
+  /**
+   * Set the recovery id.
+   * @param recoveryId the new recoveryId
+   */
+  abstract public void setRecoveryID(long recoveryId);
+
+  abstract public boolean breakHardLinksIfNeeded() throws IOException;
+
+  abstract public ReplicaRecoveryInfo createInfo();
+
+  abstract public int compareWith(ScanInfo info);
+
+  abstract public void truncateBlock(long newLength) throws IOException;
+
+  abstract public void copyMetadata(URI destination) throws IOException;
+
+  abstract public void copyBlockdata(URI destination) throws IOException;
+
+  /**
+   * Number of bytes originally reserved for this replica. The actual
+   * reservation is adjusted as data is written to disk.
+   *
+   * @return the number of bytes originally reserved for this replica.
+   */
+  public long getOriginalBytesReserved() {
+    return 0;
   }
 
   @Override  //Object
@@ -298,7 +266,7 @@ abstract public class ReplicaInfo extends Block
         + "\n  getBytesOnDisk()  = " + getBytesOnDisk()
         + "\n  getVisibleLength()= " + getVisibleLength()
         + "\n  getVolume()       = " + getVolume()
-        + "\n  getBlockFile()    = " + getBlockFile();
+        + "\n  getBlockURI()     = " + getBlockURI();
   }
 
   @Override

+ 12 - 18
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java

@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
@@ -31,19 +29,19 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
  * A recovery with higher recovery id preempts recoveries with a lower id.
  *
  */
-public class ReplicaUnderRecovery extends ReplicaInfo {
-  private ReplicaInfo original; // the original replica that needs to be recovered
+public class ReplicaUnderRecovery extends LocalReplica {
+  private LocalReplica original; // original replica to be recovered
   private long recoveryId; // recovery id; it is also the generation stamp 
                            // that the replica will be bumped to after recovery
 
   public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
-    super(replica, replica.getVolume(), replica.getDir());
+    super(replica, replica.getVolume(), ((LocalReplica)replica).getDir());
     if ( replica.getState() != ReplicaState.FINALIZED &&
          replica.getState() != ReplicaState.RBW &&
          replica.getState() != ReplicaState.RWR ) {
       throw new IllegalArgumentException("Cannot recover replica: " + replica);
     }
-    this.original = replica;
+    this.original = (LocalReplica) replica;
     this.recoveryId = recoveryId;
   }
 
@@ -53,22 +51,16 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
    */
   public ReplicaUnderRecovery(ReplicaUnderRecovery from) {
     super(from);
-    this.original = from.getOriginalReplica();
+    this.original = (LocalReplica) from.getOriginalReplica();
     this.recoveryId = from.getRecoveryID();
   }
 
-  /** 
-   * Get the recovery id
-   * @return the generation stamp that the replica will be bumped to 
-   */
+  @Override
   public long getRecoveryID() {
     return recoveryId;
   }
 
-  /** 
-   * Set the recovery id
-   * @param recoveryId the new recoveryId
-   */
+  @Override
   public void setRecoveryID(long recoveryId) {
     if (recoveryId > this.recoveryId) {
       this.recoveryId = recoveryId;
@@ -82,6 +74,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
    * Get the original replica that's under recovery
    * @return the original replica under recovery
    */
+  @Override
   public ReplicaInfo getOriginalReplica() {
     return original;
   }
@@ -120,9 +113,9 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
   }
   
   @Override //ReplicaInfo
-  public void setDir(File dir) {
-    super.setDir(dir);
-    original.setDir(dir);
+  public void updateWithReplica(StorageLocation replicaLocation) {
+    super.updateWithReplica(replicaLocation);
+    original.updateWithReplica(replicaLocation);
   }
   
   @Override //ReplicaInfo
@@ -148,6 +141,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
         + "\n  original=" + original;
   }
 
+  @Override
   public ReplicaRecoveryInfo createInfo() {
     return new ReplicaRecoveryInfo(original.getBlockId(), 
         original.getBytesOnDisk(), original.getGenerationStamp(),

+ 26 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java

@@ -22,6 +22,7 @@ import java.io.File;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 
 /**
  * This class represents a replica that is waiting to be recovered.
@@ -32,7 +33,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
  * client continues to write or be recovered as a result of
  * lease recovery.
  */
-public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
+public class ReplicaWaitingToBeRecovered extends LocalReplica {
 
   /**
    * Constructor
@@ -94,4 +95,28 @@ public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
   public String toString() {
     return super.toString();
   }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
 }

+ 4 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -44,9 +44,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
@@ -230,10 +229,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
   VolumeFailureSummary getVolumeFailureSummary();
 
   /** @return a list of finalized blocks for the given block pool. */
-  List<FinalizedReplica> getFinalizedBlocks(String bpid);
+  List<ReplicaInfo> getFinalizedBlocks(String bpid);
 
   /** @return a list of finalized blocks for the given block pool. */
-  List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid);
+  List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage(String bpid);
 
   /**
    * Check whether the in-memory block record matches the block on the disk,
@@ -337,7 +336,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
    * @param temporary the temporary replica being converted
    * @return the result RBW
    */
-  ReplicaInPipelineInterface convertTemporaryToRbw(
+  ReplicaInPipeline convertTemporaryToRbw(
       ExtendedBlock temporary) throws IOException;
 
   /**

+ 49 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -45,13 +45,13 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
@@ -309,14 +309,14 @@ class BlockPoolSlice {
     return rbwFile;
   }
 
-  File addFinalizedBlock(Block b, File f) throws IOException {
+  File addFinalizedBlock(Block b, ReplicaInfo replicaInfo) throws IOException {
     File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
     if (!blockDir.exists()) {
       if (!blockDir.mkdirs()) {
         throw new IOException("Failed to mkdirs " + blockDir);
       }
     }
-    File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
+    File blockFile = FsDatasetImpl.moveBlockFiles(b, replicaInfo, blockDir);
     File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
     if (dfsUsage instanceof CachingGetSpaceUsed) {
       ((CachingGetSpaceUsed) dfsUsage).incDfsUsed(
@@ -329,16 +329,28 @@ class BlockPoolSlice {
    * Move a persisted replica from lazypersist directory to a subdirectory
    * under finalized.
    */
-  File activateSavedReplica(Block b, File metaFile, File blockFile)
-      throws IOException {
-    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+  ReplicaInfo activateSavedReplica(ReplicaInfo replicaInfo,
+      RamDiskReplica replicaState) throws IOException {
+    File metaFile = replicaState.getSavedMetaFile();
+    File blockFile = replicaState.getSavedBlockFile();
+    final long blockId = replicaInfo.getBlockId();
+    final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
     final File targetBlockFile = new File(blockDir, blockFile.getName());
     final File targetMetaFile = new File(blockDir, metaFile.getName());
     FileUtils.moveFile(blockFile, targetBlockFile);
     FsDatasetImpl.LOG.info("Moved " + blockFile + " to " + targetBlockFile);
     FileUtils.moveFile(metaFile, targetMetaFile);
     FsDatasetImpl.LOG.info("Moved " + metaFile + " to " + targetMetaFile);
-    return targetBlockFile;
+
+    ReplicaInfo newReplicaInfo =
+        new ReplicaBuilder(ReplicaState.FINALIZED)
+        .setBlockId(blockId)
+        .setLength(replicaInfo.getBytesOnDisk())
+        .setGenerationStamp(replicaInfo.getGenerationStamp())
+        .setFsVolume(replicaState.getLazyPersistVolume())
+        .setDirectoryToUse(targetBlockFile.getParentFile())
+        .build();
+    return newReplicaInfo;
   }
 
   void checkDirs() throws DiskErrorException {
@@ -461,9 +473,13 @@ class BlockPoolSlice {
     long blockId = block.getBlockId();
     long genStamp = block.getGenerationStamp();
     if (isFinalized) {
-      newReplica = new FinalizedReplica(blockId,
-          block.getNumBytes(), genStamp, volume, DatanodeUtil
-          .idToBlockDir(finalizedDir, blockId));
+      newReplica = new ReplicaBuilder(ReplicaState.FINALIZED)
+          .setBlockId(blockId)
+          .setLength(block.getNumBytes())
+          .setGenerationStamp(genStamp)
+          .setFsVolume(volume)
+          .setDirectoryToUse(DatanodeUtil.idToBlockDir(finalizedDir, blockId))
+          .build();
     } else {
       File file = new File(rbwDir, block.getBlockName());
       boolean loadRwr = true;
@@ -477,9 +493,15 @@ class BlockPoolSlice {
           // It didn't expire. Load the replica as a RBW.
           // We don't know the expected block length, so just use 0
           // and don't reserve any more space for writes.
-          newReplica = new ReplicaBeingWritten(blockId,
-              validateIntegrityAndSetLength(file, genStamp),
-              genStamp, volume, file.getParentFile(), null, 0);
+          newReplica = new ReplicaBuilder(ReplicaState.RBW)
+              .setBlockId(blockId)
+              .setLength(validateIntegrityAndSetLength(file, genStamp))
+              .setGenerationStamp(genStamp)
+              .setFsVolume(volume)
+              .setDirectoryToUse(file.getParentFile())
+              .setWriterThread(null)
+              .setBytesToReserve(0)
+              .build();
           loadRwr = false;
         }
         sc.close();
@@ -496,9 +518,13 @@ class BlockPoolSlice {
       }
       // Restart meta doesn't exist or expired.
       if (loadRwr) {
-        newReplica = new ReplicaWaitingToBeRecovered(blockId,
-            validateIntegrityAndSetLength(file, genStamp),
-            genStamp, volume, file.getParentFile());
+        ReplicaBuilder builder = new ReplicaBuilder(ReplicaState.RWR)
+            .setBlockId(blockId)
+            .setLength(validateIntegrityAndSetLength(file, genStamp))
+            .setGenerationStamp(genStamp)
+            .setFsVolume(volume)
+            .setDirectoryToUse(file.getParentFile());
+        newReplica = builder.build();
       }
     }
 
@@ -614,7 +640,7 @@ class BlockPoolSlice {
 
     // it's the same block so don't ever delete it, even if GS or size
     // differs.  caller should keep the one it just discovered on disk
-    if (replica1.getBlockFile().equals(replica2.getBlockFile())) {
+    if (replica1.getBlockURI().equals(replica2.getBlockURI())) {
       return null;
     }
     if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
@@ -641,13 +667,11 @@ class BlockPoolSlice {
 
   private void deleteReplica(final ReplicaInfo replicaToDelete) {
     // Delete the files on disk. Failure here is okay.
-    final File blockFile = replicaToDelete.getBlockFile();
-    if (!blockFile.delete()) {
-      LOG.warn("Failed to delete block file " + blockFile);
+    if (!replicaToDelete.deleteBlockData()) {
+      LOG.warn("Failed to delete block file for replica " + replicaToDelete);
     }
-    final File metaFile = replicaToDelete.getMetaFile();
-    if (!metaFile.delete()) {
-      LOG.warn("Failed to delete meta file " + metaFile);
+    if (!replicaToDelete.deleteMetadata()) {
+      LOG.warn("Failed to delete meta file for replica " + replicaToDelete);
     }
   }
 

+ 44 - 27
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
 import java.io.FileDescriptor;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -34,6 +35,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.io.IOUtils;
@@ -211,12 +214,12 @@ class FsDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FsVolumeReference volumeRef, File blockFile, File metaFile,
+  void deleteAsync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete,
       ExtendedBlock block, String trashDirectory) {
     LOG.info("Scheduling " + block.getLocalBlock()
-        + " file " + blockFile + " for deletion");
+        + " replica " + replicaToDelete + " for deletion");
     ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
-        volumeRef, blockFile, metaFile, block, trashDirectory);
+        volumeRef, replicaToDelete, block, trashDirectory);
     execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
   }
   
@@ -227,19 +230,18 @@ class FsDatasetAsyncDiskService {
    *  files are deleted immediately.
    */
   class ReplicaFileDeleteTask implements Runnable {
-    final FsVolumeReference volumeRef;
-    final FsVolumeImpl volume;
-    final File blockFile;
-    final File metaFile;
-    final ExtendedBlock block;
-    final String trashDirectory;
-    
-    ReplicaFileDeleteTask(FsVolumeReference volumeRef, File blockFile,
-        File metaFile, ExtendedBlock block, String trashDirectory) {
+    private final FsVolumeReference volumeRef;
+    private final FsVolumeImpl volume;
+    private final ReplicaInfo replicaToDelete;
+    private final ExtendedBlock block;
+    private final String trashDirectory;
+
+    ReplicaFileDeleteTask(FsVolumeReference volumeRef,
+        ReplicaInfo replicaToDelete, ExtendedBlock block,
+        String trashDirectory) {
       this.volumeRef = volumeRef;
       this.volume = (FsVolumeImpl) volumeRef.getVolume();
-      this.blockFile = blockFile;
-      this.metaFile = metaFile;
+      this.replicaToDelete = replicaToDelete;
       this.block = block;
       this.trashDirectory = trashDirectory;
     }
@@ -248,15 +250,22 @@ class FsDatasetAsyncDiskService {
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
       return "deletion of block " + block.getBlockPoolId() + " "
-          + block.getLocalBlock() + " with block file " + blockFile
-          + " and meta file " + metaFile + " from volume " + volume;
+          + block.getLocalBlock() + " with block file "
+          + replicaToDelete.getBlockURI() + " and meta file "
+          + replicaToDelete.getMetadataURI() + " from volume " + volume;
     }
 
     private boolean deleteFiles() {
-      return blockFile.delete() && (metaFile.delete() || !metaFile.exists());
+      return replicaToDelete.deleteBlockData() &&
+        (replicaToDelete.deleteMetadata() || !replicaToDelete.metadataExists());
     }
 
     private boolean moveFiles() {
+      if (trashDirectory == null) {
+        LOG.error("Trash dir for replica " + replicaToDelete + " is null");
+        return false;
+      }
+
       File trashDirFile = new File(trashDirectory);
       if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
         LOG.error("Failed to create trash directory " + trashDirectory);
@@ -264,20 +273,28 @@ class FsDatasetAsyncDiskService {
       }
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Moving files " + blockFile.getName() + " and " +
-            metaFile.getName() + " to trash.");
+        LOG.debug("Moving files " + replicaToDelete.getBlockURI() + " and " +
+            replicaToDelete.getMetadataURI() + " to trash.");
       }
 
-      File newBlockFile = new File(trashDirectory, blockFile.getName());
-      File newMetaFile = new File(trashDirectory, metaFile.getName());
-      return (blockFile.renameTo(newBlockFile) &&
-              metaFile.renameTo(newMetaFile));
+      final String blockName = replicaToDelete.getBlockName();
+      final long genstamp = replicaToDelete.getGenerationStamp();
+      File newBlockFile = new File(trashDirectory, blockName);
+      File newMetaFile = new File(trashDirectory,
+          DatanodeUtil.getMetaName(blockName, genstamp));
+      try {
+        return (replicaToDelete.renameData(newBlockFile.toURI()) &&
+                replicaToDelete.renameMeta(newMetaFile.toURI()));
+      } catch (IOException e) {
+        LOG.error("Error moving files to trash: " + replicaToDelete, e);
+      }
+      return false;
     }
 
     @Override
     public void run() {
-      final long blockLength = blockFile.length();
-      final long metaLength = metaFile.length();
+      final long blockLength = replicaToDelete.getBlockDataLength();
+      final long metaLength = replicaToDelete.getMetadataLength();
       boolean result;
 
       result = (trashDirectory == null) ? deleteFiles() : moveFiles();
@@ -286,7 +303,7 @@ class FsDatasetAsyncDiskService {
         LOG.warn("Unexpected error trying to "
             + (trashDirectory == null ? "delete" : "move")
             + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
-            + " at file " + blockFile + ". Ignored.");
+            + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
       } else {
         if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
           datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
@@ -294,7 +311,7 @@ class FsDatasetAsyncDiskService {
         volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
         volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
         LOG.info("Deleted " + block.getBlockPoolId() + " "
-            + block.getLocalBlock() + " file " + blockFile);
+            + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
       }
       updateDeletedBlockId(block);
       IOUtils.cleanup(null, volumeRef);

File diff suppressed because it is too large
+ 199 - 369
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java


+ 18 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java

@@ -18,14 +18,17 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.io.IOUtils;
 
 /** Utility methods. */
 @InterfaceAudience.Private
@@ -71,6 +74,21 @@ public class FsDatasetUtil {
     return matches[0];
   }
 
+  public static FileInputStream openAndSeek(File file, long offset)
+      throws IOException {
+    RandomAccessFile raf = null;
+    try {
+      raf = new RandomAccessFile(file, "r");
+      if (offset > 0) {
+        raf.seek(offset);
+      }
+      return new FileInputStream(raf.getFD());
+    } catch(IOException ioe) {
+      IOUtils.cleanup(null, raf);
+      throw ioe;
+    }
+  }
+
   /**
    * Find the meta-file for the specified block file
    * and then return the generation stamp from the name of the meta-file.

+ 145 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -47,11 +47,19 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -102,7 +110,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
   // Disk space reserved for blocks (RBW or Re-replicating) open for write.
   private AtomicLong reservedForReplicas;
   private long recentReserved = 0;
-
+  private final Configuration conf;
   // Capacity configured. This is useful when we want to
   // limit the visible capacity for tests. If negative, then we just
   // query from the filesystem.
@@ -130,6 +138,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
     this.usage = new DF(parent, conf);
     this.storageType = storageType;
     this.configuredCapacity = -1;
+    this.conf = conf;
     cacheExecutor = initializeCacheExecutor(parent);
   }
 
@@ -896,10 +905,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
    * @return
    * @throws IOException
    */
-  File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
-      throws IOException {
+  ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo,
+      long bytesReserved) throws IOException {
     releaseReservedSpace(bytesReserved);
-    return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
+    File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo);
+    return new ReplicaBuilder(ReplicaState.FINALIZED)
+        .setBlock(replicaInfo)
+        .setFsVolume(this)
+        .setDirectoryToUse(dest.getParentFile())
+        .build();
   }
 
   Executor getCacheExecutor() {
@@ -950,18 +964,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
 
-  void addBlockPool(String bpid, Configuration conf) throws IOException {
-    addBlockPool(bpid, conf, null);
+  void addBlockPool(String bpid, Configuration c) throws IOException {
+    addBlockPool(bpid, c, null);
   }
 
-  void addBlockPool(String bpid, Configuration conf, Timer timer)
+  void addBlockPool(String bpid, Configuration c, Timer timer)
       throws IOException {
     File bpdir = new File(currentDir, bpid);
     BlockPoolSlice bp;
     if (timer == null) {
-      bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer());
+      bp = new BlockPoolSlice(bpid, this, bpdir, c, new Timer());
     } else {
-      bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer);
+      bp = new BlockPoolSlice(bpid, this, bpdir, c, timer);
     }
     bpSlices.put(bpid, bp);
   }
@@ -1053,5 +1067,127 @@ public class FsVolumeImpl implements FsVolumeSpi {
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }
+
+
+  public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
+      long newGS, long estimateBlockLen) throws IOException {
+
+    long bytesReserved = estimateBlockLen - replicaInfo.getNumBytes();
+    if (getAvailable() < bytesReserved) {
+      throw new DiskOutOfSpaceException("Insufficient space for appending to "
+          + replicaInfo);
+    }
+
+    assert replicaInfo.getVolume() == this:
+      "The volume of the replica should be the same as this volume";
+
+    // construct a RBW replica with the new GS
+    File newBlkFile = new File(getRbwDir(bpid), replicaInfo.getBlockName());
+    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(replicaInfo.getBlockId())
+        .setLength(replicaInfo.getNumBytes())
+        .setGenerationStamp(newGS)
+        .setFsVolume(this)
+        .setDirectoryToUse(newBlkFile.getParentFile())
+        .setWriterThread(Thread.currentThread())
+        .setBytesToReserve(bytesReserved)
+        .buildLocalReplicaInPipeline();
+
+    // rename meta file to rbw directory
+    // rename block file to rbw directory
+    newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile);
+
+    reserveSpaceForReplica(bytesReserved);
+    return newReplicaInfo;
+  }
+
+  public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {
+
+    File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
+    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(b.getBlockId())
+        .setGenerationStamp(b.getGenerationStamp())
+        .setFsVolume(this)
+        .setDirectoryToUse(f.getParentFile())
+        .setBytesToReserve(b.getNumBytes())
+        .buildLocalReplicaInPipeline();
+    return newReplicaInfo;
+  }
+
+  public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock b,
+      ReplicaInfo temp) throws IOException {
+
+    final long blockId = b.getBlockId();
+    final long expectedGs = b.getGenerationStamp();
+    final long visible = b.getNumBytes();
+    final long numBytes = temp.getNumBytes();
+
+    // move block files to the rbw directory
+    BlockPoolSlice bpslice = getBlockPoolSlice(b.getBlockPoolId());
+    final File dest = FsDatasetImpl.moveBlockFiles(b.getLocalBlock(), temp,
+        bpslice.getRbwDir());
+    // create RBW
+    final LocalReplicaInPipeline rbw = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(blockId)
+        .setLength(numBytes)
+        .setGenerationStamp(expectedGs)
+        .setFsVolume(this)
+        .setDirectoryToUse(dest.getParentFile())
+        .setWriterThread(Thread.currentThread())
+        .setBytesToReserve(0)
+        .buildLocalReplicaInPipeline();
+    rbw.setBytesAcked(visible);
+    return rbw;
+  }
+
+  public ReplicaInPipeline createTemporary(ExtendedBlock b) throws IOException {
+    // create a temporary file to hold block in the designated volume
+    File f = createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
+    LocalReplicaInPipeline newReplicaInfo =
+        new ReplicaBuilder(ReplicaState.TEMPORARY)
+          .setBlockId(b.getBlockId())
+          .setGenerationStamp(b.getGenerationStamp())
+          .setDirectoryToUse(f.getParentFile())
+          .setBytesToReserve(b.getLocalBlock().getNumBytes())
+          .setFsVolume(this)
+          .buildLocalReplicaInPipeline();
+    return newReplicaInfo;
+  }
+
+  public ReplicaInPipeline updateRURCopyOnTruncate(ReplicaInfo rur,
+      String bpid, long newBlockId, long recoveryId, long newlength)
+      throws IOException {
+
+    rur.breakHardLinksIfNeeded();
+    File[] copiedReplicaFiles =
+        copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
+    File blockFile = copiedReplicaFiles[1];
+    File metaFile = copiedReplicaFiles[0];
+    LocalReplica.truncateBlock(blockFile, metaFile,
+        rur.getNumBytes(), newlength);
+
+    LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
+        .setBlockId(newBlockId)
+        .setGenerationStamp(recoveryId)
+        .setFsVolume(this)
+        .setDirectoryToUse(blockFile.getParentFile())
+        .setBytesToReserve(newlength)
+        .buildLocalReplicaInPipeline();
+    return newReplicaInfo;
+  }
+
+  private File[] copyReplicaWithNewBlockIdAndGS(
+      ReplicaInfo replicaInfo, String bpid, long newBlkId, long newGS)
+      throws IOException {
+    String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
+    FsVolumeImpl v = (FsVolumeImpl) replicaInfo.getVolume();
+    final File tmpDir = v.getBlockPoolSlice(bpid).getTmpDir();
+    final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
+    final File dstBlockFile = new File(destDir, blockFileName);
+    final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
+    return FsDatasetImpl.copyBlockFiles(replicaInfo, dstMetaFile,
+        dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf);
+  }
+
 }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java

@@ -311,7 +311,7 @@ class FsVolumeList {
     } else {
       // If the volume is not put into a volume scanner, it does not need to
       // hold the reference.
-      IOUtils.cleanup(FsDatasetImpl.LOG, ref);
+      IOUtils.cleanup(null, ref);
     }
     // If the volume is used to replace a failed volume, it needs to reset the
     // volume failure info for this volume.

+ 17 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 
 import java.io.File;
@@ -182,8 +183,7 @@ class RamDiskAsyncLazyPersistService {
    */
   void submitLazyPersistTask(String bpId, long blockId,
       long genStamp, long creationTime,
-      File metaFile, File blockFile,
-      FsVolumeReference target) throws IOException {
+      ReplicaInfo replica, FsVolumeReference target) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
           + bpId + " block id: " + blockId);
@@ -198,31 +198,29 @@ class RamDiskAsyncLazyPersistService {
     }
 
     ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
-        bpId, blockId, genStamp, creationTime, blockFile, metaFile,
+        bpId, blockId, genStamp, creationTime, replica,
         target, lazyPersistDir);
     execute(volume.getCurrentDir(), lazyPersistTask);
   }
 
   class ReplicaLazyPersistTask implements Runnable {
-    final String bpId;
-    final long blockId;
-    final long genStamp;
-    final long creationTime;
-    final File blockFile;
-    final File metaFile;
-    final FsVolumeReference targetVolume;
-    final File lazyPersistDir;
+    private final String bpId;
+    private final long blockId;
+    private final long genStamp;
+    private final long creationTime;
+    private final ReplicaInfo replicaInfo;
+    private final FsVolumeReference targetVolume;
+    private final File lazyPersistDir;
 
     ReplicaLazyPersistTask(String bpId, long blockId,
         long genStamp, long creationTime,
-        File blockFile, File metaFile,
+        ReplicaInfo replicaInfo,
         FsVolumeReference targetVolume, File lazyPersistDir) {
       this.bpId = bpId;
       this.blockId = blockId;
       this.genStamp = genStamp;
       this.creationTime = creationTime;
-      this.blockFile = blockFile;
-      this.metaFile = metaFile;
+      this.replicaInfo = replicaInfo;
       this.targetVolume = targetVolume;
       this.lazyPersistDir = lazyPersistDir;
     }
@@ -232,8 +230,10 @@ class RamDiskAsyncLazyPersistService {
       // Called in AsyncLazyPersistService.execute for displaying error messages.
       return "LazyWriter async task of persist RamDisk block pool id:"
           + bpId + " block pool id: "
-          + blockId + " with block file " + blockFile
-          + " and meta file " + metaFile + " to target volume " + targetVolume;}
+          + blockId + " with block file " + replicaInfo.getBlockURI()
+          + " and meta file " + replicaInfo.getMetadataURI()
+          + " to target volume " + targetVolume;
+    }
 
     @Override
     public void run() {
@@ -243,7 +243,7 @@ class RamDiskAsyncLazyPersistService {
         int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF);
         // No FsDatasetImpl lock for the file copy
         File targetFiles[] = FsDatasetImpl.copyBlockFiles(
-            blockId, genStamp, metaFile, blockFile, lazyPersistDir, true,
+            blockId, genStamp, replicaInfo, lazyPersistDir, true,
             smallBufferSize, conf);
 
         // Lock FsDataSetImpl during onCompleteLazyPersist callback

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

@@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -530,7 +530,7 @@ public class TestClientProtocolForPipelineRecovery {
 
       DataNodeFaultInjector.set(new DataNodeFaultInjector() {
         @Override
-        public void failPipeline(ReplicaInPipelineInterface replicaInfo,
+        public void failPipeline(ReplicaInPipeline replicaInfo,
             String mirror) throws IOException {
           if (!lastDn.equals(mirror)) {
             // Only fail for second DN

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestCrcCorruption.java

@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -172,12 +172,12 @@ public class TestCrcCorruption {
       final int dnIdx = 0;
       final DataNode dn = cluster.getDataNodes().get(dnIdx);
       final String bpid = cluster.getNamesystem().getBlockPoolId();
-      List<FinalizedReplica> replicas =
+      List<ReplicaInfo> replicas =
           dn.getFSDataset().getFinalizedBlocks(bpid);
       assertTrue("Replicas do not exist", !replicas.isEmpty());
 
       for (int idx = 0; idx < replicas.size(); idx++) {
-        FinalizedReplica replica = replicas.get(idx);
+        ReplicaInfo replica = replicas.get(idx);
         ExtendedBlock eb = new ExtendedBlock(bpid, replica);
         if (idx % 3 == 0) {
           LOG.info("Deliberately removing meta for block " + eb);

+ 26 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -132,7 +132,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   // information about a single block
-  private class BInfo implements ReplicaInPipelineInterface {
+  private class BInfo implements ReplicaInPipeline {
     final Block theBlock;
     private boolean finalized = false; // if not finalized => ongoing creation
     SimulatedOutputStream oStream = null;
@@ -330,6 +330,28 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     public boolean isOnTransientStorage() {
       return false;
     }
+
+    @Override
+    public ReplicaInfo getReplicaInfo() {
+      return null;
+    }
+
+    @Override
+    public void setWriter(Thread writer) {
+    }
+
+    @Override
+    public void interruptThread() {
+    }
+
+    @Override
+    public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
+      return false;
+    }
+
+    @Override
+    public void stopWriter(long xceiverStopTimeout) throws IOException {
+    }
   }
   
   /**
@@ -1228,7 +1250,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public ReplicaInPipelineInterface convertTemporaryToRbw(ExtendedBlock temporary)
+  public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock temporary)
       throws IOException {
     final Map<Block, BInfo> map = blockMap.get(temporary.getBlockPoolId());
     if (map == null) {
@@ -1302,12 +1324,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   @Override
-  public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
     throw new UnsupportedOperationException();
   }
 
   @Override
-  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage(String bpid) {
     throw new UnsupportedOperationException();
   }
 

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolSliceStorage.java

@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.util.Random;
@@ -105,7 +106,10 @@ public class TestBlockPoolSliceStorage {
 
     LOG.info("Got subdir " + blockFileSubdir);
     LOG.info("Generated file path " + testFilePath);
-    assertThat(storage.getTrashDirectory(new File(testFilePath)), is(expectedTrashPath));
+
+    ReplicaInfo info = Mockito.mock(ReplicaInfo.class);
+    Mockito.when(info.getBlockURI()).thenReturn(new File(testFilePath).toURI());
+    assertThat(storage.getTrashDirectory(info), is(expectedTrashPath));
   }
 
   /*

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

@@ -667,7 +667,7 @@ public class TestBlockRecovery {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Running " + GenericTestUtils.getMethodName());
     }
-    ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
+    ReplicaInPipeline replicaInfo = dn.data.createRbw(
         StorageType.DEFAULT, block, false).getReplica();
     ReplicaOutputStreams streams = null;
     try {

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Ensure that the DataNode correctly handles rolling upgrade
@@ -114,8 +115,11 @@ public class TestDataNodeRollingUpgrade {
   }
 
   private File getTrashFileForBlock(File blockFile, boolean exists) {
+
+    ReplicaInfo info = Mockito.mock(ReplicaInfo.class);
+    Mockito.when(info.getBlockURI()).thenReturn(blockFile.toURI());
     File trashFile = new File(
-        dn0.getStorage().getTrashDirectoryForBlockFile(blockPoolId, blockFile));
+        dn0.getStorage().getTrashDirectoryForReplica(blockPoolId, info));
     assertEquals(exists, trashFile.exists());
     return trashFile;
   }

+ 8 - 9
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

@@ -116,8 +116,8 @@ public class TestDirectoryScanner {
   private long truncateBlockFile() throws IOException {
     try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
-        File f = b.getBlockFile();
-        File mf = b.getMetaFile();
+        File f = new File(b.getBlockURI());
+        File mf = new File(b.getMetadataURI());
         // Truncate a block file that has a corresponding metadata file
         if (f.exists() && f.length() != 0 && mf.exists()) {
           FileOutputStream s = null;
@@ -141,8 +141,8 @@ public class TestDirectoryScanner {
   private long deleteBlockFile() {
     try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
-        File f = b.getBlockFile();
-        File mf = b.getMetaFile();
+        File f = new File(b.getBlockURI());
+        File mf = new File(b.getMetadataURI());
         // Delete a block file that has corresponding metadata file
         if (f.exists() && mf.exists() && f.delete()) {
           LOG.info("Deleting block file " + f.getAbsolutePath());
@@ -157,10 +157,9 @@ public class TestDirectoryScanner {
   private long deleteMetaFile() {
     try(AutoCloseableLock lock = fds.acquireDatasetLock()) {
       for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
-        File file = b.getMetaFile();
         // Delete a metadata file
-        if (file.exists() && file.delete()) {
-          LOG.info("Deleting metadata file " + file.getAbsolutePath());
+        if (b.metadataExists() && b.deleteMetadata()) {
+          LOG.info("Deleting metadata " + b.getMetadataURI());
           return b.getBlockId();
         }
       }
@@ -184,8 +183,8 @@ public class TestDirectoryScanner {
           }
 
           // Volume without a copy of the block. Make a copy now.
-          File sourceBlock = b.getBlockFile();
-          File sourceMeta = b.getMetaFile();
+          File sourceBlock = new File(b.getBlockURI());
+          File sourceMeta = new File(b.getMetadataURI());
           String sourceRoot = b.getVolume().getBasePath();
           String destRoot = v.getBasePath();
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java

@@ -80,7 +80,7 @@ public class TestSimulatedFSDataset {
       ExtendedBlock b = new ExtendedBlock(bpid, blkID, 0, 0);
       // we pass expected len as zero, - fsdataset should use the sizeof actual
       // data written
-      ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
+      ReplicaInPipeline bInfo = fsdataset.createRbw(
           StorageType.DEFAULT, b, false).getReplica();
       ReplicaOutputStreams out = bInfo.createStreams(true,
           DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java

@@ -57,7 +57,7 @@ public class TestTransferRbw {
       String bpid) throws InterruptedException {
     return (ReplicaBeingWritten)getReplica(datanode, bpid, ReplicaState.RBW);
   }
-  private static ReplicaInPipeline getReplica(final DataNode datanode,
+  private static LocalReplicaInPipeline getReplica(final DataNode datanode,
       final String bpid, final ReplicaState expectedState) throws InterruptedException {
     final Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(
         datanode.getFSDataset(), bpid);
@@ -68,7 +68,7 @@ public class TestTransferRbw {
     Assert.assertEquals(1, replicas.size());
     final ReplicaInfo r = replicas.iterator().next();
     Assert.assertEquals(expectedState, r.getState());
-    return (ReplicaInPipeline)r;
+    return (LocalReplicaInPipeline)r;
   }
 
   @Test

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -87,12 +87,12 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocks(String bpid) {
     return null;
   }
 
   @Override
-  public List<FinalizedReplica> getFinalizedBlocksOnPersistentStorage(String bpid) {
+  public List<ReplicaInfo> getFinalizedBlocksOnPersistentStorage(String bpid) {
     return null;
   }
 
@@ -159,7 +159,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   }
 
   @Override
-  public ReplicaInPipelineInterface convertTemporaryToRbw(
+  public ReplicaInPipeline convertTemporaryToRbw(
       ExtendedBlock temporary) throws IOException {
     return new ExternalReplicaInPipeline();
   }

+ 24 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java

@@ -23,11 +23,12 @@ import java.io.OutputStream;
 
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.ChunkChecksum;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.util.DataChecksum;
 
-public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
+public class ExternalReplicaInPipeline implements ReplicaInPipeline {
 
   @Override
   public void setNumBytes(long bytesReceived) {
@@ -105,4 +106,25 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
   public boolean isOnTransientStorage() {
     return false;
   }
+
+  @Override
+  public ReplicaInfo getReplicaInfo() {
+    return null;
+  }
+
+  public void setWriter(Thread writer) {
+  }
+
+  public void stopWriter(long xceiverStopTimeout)
+      throws IOException {
+  }
+
+  @Override
+  public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
+    return false;
+  }
+
+  @Override
+  public void interruptThread() {
+  }
 }

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java

@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdfs.server.datanode.extdataset;
 
 import org.apache.hadoop.hdfs.server.datanode.Replica;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.junit.Test;
@@ -75,7 +75,7 @@ public class TestExternalDataset {
    */
   @Test
   public void testInstantiateReplicaInPipeline() throws Throwable {
-    ReplicaInPipelineInterface inst = new ExternalReplicaInPipeline();
+    ReplicaInPipeline inst = new ExternalReplicaInPipeline();
   }
 
   /**

+ 21 - 22
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java

@@ -26,8 +26,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -35,11 +35,11 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -205,8 +205,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
     dataset = (FsDatasetImpl) datanode.getFSDataset();
   }
 
-  private File getBlockFile(ExtendedBlock eb) throws IOException {
-    return dataset.getBlockFile(eb.getBlockPoolId(), eb.getBlockId());
+  private ReplicaInfo getBlockFile(ExtendedBlock eb) throws IOException {
+    return dataset.getReplicaInfo(eb);
   }
 
   /**
@@ -217,8 +217,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
       throws ReplicaNotFoundException {
     File blockFile;
     try {
-       blockFile = dataset.getBlockFile(
-           block.getBlockPoolId(), block.getBlockId());
+      ReplicaInfo r = dataset.getReplicaInfo(block);
+      blockFile = new File(r.getBlockURI());
     } catch (IOException e) {
       LOG.error("Block file for " + block + " does not existed:", e);
       throw new ReplicaNotFoundException(block);
@@ -240,7 +240,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
   public Replica createFinalizedReplica(FsVolumeSpi volume, ExtendedBlock block)
       throws IOException {
     FsVolumeImpl vol = (FsVolumeImpl) volume;
-    ReplicaInfo info = new FinalizedReplica(block.getLocalBlock(), vol,
+    FinalizedReplica info = new FinalizedReplica(block.getLocalBlock(), vol,
         vol.getCurrentDir().getParentFile());
     dataset.volumeMap.add(block.getBlockPoolId(), info);
     info.getBlockFile().createNewFile();
@@ -260,7 +260,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
   public Replica createReplicaInPipeline(
       FsVolumeSpi volume, ExtendedBlock block) throws IOException {
     FsVolumeImpl vol = (FsVolumeImpl) volume;
-    ReplicaInPipeline rip = new ReplicaInPipeline(
+    LocalReplicaInPipeline rip = new LocalReplicaInPipeline(
         block.getBlockId(), block.getGenerationStamp(), volume,
         vol.createTmpFile(
             block.getBlockPoolId(), block.getLocalBlock()).getParentFile(),
@@ -305,9 +305,11 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
     FsVolumeImpl vol = (FsVolumeImpl) volume;
     final String bpid = eb.getBlockPoolId();
     final Block block = eb.getLocalBlock();
-    ReplicaWaitingToBeRecovered rwbr =
-        new ReplicaWaitingToBeRecovered(eb.getLocalBlock(), volume,
-            vol.createRbwFile(bpid, block).getParentFile());
+    ReplicaInfo rwbr = new ReplicaBuilder(ReplicaState.RWR)
+        .setBlock(eb.getLocalBlock())
+        .setFsVolume(volume)
+        .setDirectoryToUse(vol.createRbwFile(bpid, block).getParentFile())
+        .build();
     dataset.volumeMap.add(bpid, rwbr);
     return rwbr;
   }
@@ -354,6 +356,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
             "Meta file " + metaFile + " already exists."
         );
       }
+      dataset.volumeMap.add(block.getBlockPoolId(), finalized);
     }
   }
 
@@ -379,25 +382,21 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
 
   @Override
   public long getStoredDataLength(ExtendedBlock block) throws IOException {
-    File f = getBlockFile(block);
-    try (RandomAccessFile raf = new RandomAccessFile(f, "r")) {
-      return raf.length();
-    }
+    ReplicaInfo r = getBlockFile(block);
+    return r.getBlockDataLength();
   }
 
   @Override
   public long getStoredGenerationStamp(ExtendedBlock block) throws IOException {
-    File f = getBlockFile(block);
-    File dir = f.getParentFile();
-    File[] files = FileUtil.listFiles(dir);
-    return FsDatasetUtil.getGenerationStampFromFile(files, f);
+    ReplicaInfo r = getBlockFile(block);
+    return r.getGenerationStamp();
   }
 
   @Override
   public void changeStoredGenerationStamp(
       ExtendedBlock block, long newGenStamp) throws IOException {
-    File blockFile =
-        dataset.getBlockFile(block.getBlockPoolId(), block.getBlockId());
+    ReplicaInfo r = dataset.getReplicaInfo(block);
+    File blockFile = new File(r.getBlockURI());
     File metaFile = FsDatasetUtil.findMetaFile(blockFile);
     File newMetaFile = new File(
         DatanodeUtil.getMetaName(blockFile.getAbsolutePath(), newGenStamp));

+ 15 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java

@@ -24,16 +24,16 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
 import java.util.Collection;
-import java.util.Random;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.io.IOUtils;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -41,12 +41,21 @@ import static org.junit.Assert.fail;
 public class FsDatasetTestUtil {
 
   public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
-    return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
+    ReplicaInfo r;
+    try {
+      r = ((FsDatasetImpl)fsd).getReplicaInfo(bpid, bid);
+      return new File(r.getBlockURI());
+    } catch (ReplicaNotFoundException e) {
+      FsDatasetImpl.LOG.warn(String.format(
+          "Replica with id %d was not found in block pool %s.", bid, bpid), e);
+    }
+    return null;
   }
 
   public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
       ) throws IOException {
-    return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
+    ReplicaInfo r = ((FsDatasetImpl)fsd).getReplicaInfo(bpid, b.getBlockId());
+    return new File(r.getBlockURI());
   }
 
   public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
@@ -57,7 +66,8 @@ public class FsDatasetTestUtil {
 
   public static boolean breakHardlinksIfNeeded(FsDatasetSpi<?> fsd,
       ExtendedBlock block) throws IOException {
-    final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
+    final LocalReplica info =
+        (LocalReplica) ((FsDatasetImpl)fsd).getReplicaInfo(block);
     return info.breakHardLinksIfNeeded();
   }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java

@@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -483,7 +483,7 @@ public class TestWriteToReplica {
     long newGenStamp = blocks[NON_EXISTENT].getGenerationStamp() * 10;
     blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
     try {
-      ReplicaInPipelineInterface replicaInfo =
+      ReplicaInPipeline replicaInfo =
           dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
       Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
       Assert.assertTrue(

Some files were not shown because too many files changed in this diff