瀏覽代碼

HDFS-7310. Mover can give first priority to local DN if it has target storage type available in local DN. (Vinayakumar B via umamahesh)

Uma Maheswara Rao G 10 年之前
父節點
當前提交
058af60c56
共有 17 個文件被更改,包括 397 次插入150 次删除
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 4 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
  3. 12 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  4. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
  5. 9 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
  6. 75 66
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  7. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  8. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
  9. 104 30
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  10. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
  12. 27 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
  13. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
  14. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java
  15. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  16. 82 19
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
  17. 47 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -401,6 +401,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-6803 Document DFSClient#DFSInputStream expectations reading and preading
     in concurrent context. (stack via stevel)
 
+    HDFS-7310. Mover can give first priority to local DN if it has target storage type 
+    available in local DN. (Vinayakumar B via umamahesh)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 4 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java

@@ -243,6 +243,10 @@ public class Dispatcher {
      */
     private boolean chooseProxySource() {
       final DatanodeInfo targetDN = target.getDatanodeInfo();
+      // if source and target are same nodes then no need of proxy
+      if (source.getDatanodeInfo().equals(targetDN) && addTo(source)) {
+        return true;
+      }
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
         for (StorageGroup loc : block.getLocations()) {
@@ -375,19 +379,6 @@ public class Dispatcher {
     public DBlock(Block block) {
       super(block);
     }
-
-    @Override
-    public synchronized boolean isLocatedOn(StorageGroup loc) {
-      // currently we only check if replicas are located on the same DataNodes
-      // since we do not have the capability to store two replicas in the same
-      // DataNode even though they are on two different storage types
-      for (StorageGroup existing : locations) {
-        if (existing.getDatanodeInfo().equals(loc.getDatanodeInfo())) {
-          return true;
-        }
-      }
-      return false;
-    }
   }
 
   /** The class represents a desired move. */

+ 12 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.fs.FileEncryptionInfo;
-
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@@ -63,6 +62,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.Acces
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -2000,8 +2000,9 @@ public class BlockManager {
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
     BlockInfo delimiter = new BlockInfo(new Block(), (short) 1);
-    boolean added = storageInfo.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
+    AddBlockResult result = storageInfo.addBlock(delimiter);
+    assert result == AddBlockResult.ADDED 
+        : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
     int curIndex;
 
@@ -2394,14 +2395,19 @@ public class BlockManager {
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    boolean added = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
-    if (added) {
+    if (result == AddBlockResult.ADDED) {
       curReplicaDelta = 1;
       if (logEveryBlock) {
         logAddStoredBlock(storedBlock, node);
       }
+    } else if (result == AddBlockResult.REPLACED) {
+      curReplicaDelta = 0;
+      blockLog.warn("BLOCK* addStoredBlock: " + "block " + storedBlock
+          + " moved to storageType " + storageInfo.getStorageType()
+          + " on node " + node);
     } else {
       // if the same block is added again and the replica was corrupt
       // previously because of a wrong gen stamp, remove it from the
@@ -2423,7 +2429,7 @@ public class BlockManager {
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
         numLiveReplicas >= minReplication) {
       storedBlock = completeBlock(bc, storedBlock, false);
-    } else if (storedBlock.isComplete() && added) {
+    } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
       // only complete blocks are counted towards that
       // Is no-op if not in safe mode.

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
@@ -223,8 +224,9 @@ class BlocksMap {
       final boolean removed = storage.removeBlock(currentBlock);
       Preconditions.checkState(removed, "currentBlock not found.");
 
-      final boolean added = storage.addBlock(newBlock);
-      Preconditions.checkState(added, "newBlock already exists.");
+      final AddBlockResult result = storage.addBlock(newBlock);
+      Preconditions.checkState(result == AddBlockResult.ADDED,
+          "newBlock already exists.");
     }
     // replace block in the map itself
     blocks.put(newBlock);

+ 9 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java

@@ -215,10 +215,10 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  public boolean addBlock(BlockInfo b) {
+  public AddBlockResult addBlock(BlockInfo b) {
     // First check whether the block belongs to a different storage
     // on the same DN.
-    boolean replaced = false;
+    AddBlockResult result = AddBlockResult.ADDED;
     DatanodeStorageInfo otherStorage =
         b.findStorageInfo(getDatanodeDescriptor());
 
@@ -226,10 +226,10 @@ public class DatanodeStorageInfo {
       if (otherStorage != this) {
         // The block belongs to a different storage. Remove it first.
         otherStorage.removeBlock(b);
-        replaced = true;
+        result = AddBlockResult.REPLACED;
       } else {
         // The block is already associated with this storage.
-        return false;
+        return AddBlockResult.ALREADY_EXIST;
       }
     }
 
@@ -237,7 +237,7 @@ public class DatanodeStorageInfo {
     b.addStorage(this);
     blockList = b.listInsert(blockList, this);
     numBlocks++;
-    return !replaced;
+    return result;
   }
 
   boolean removeBlock(BlockInfo b) {
@@ -358,4 +358,8 @@ public class DatanodeStorageInfo {
     }
     return null;
   }
+
+  static enum AddBlockResult {
+    ADDED, REPLACED, ALREADY_EXIST;
+  }
 }

+ 75 - 66
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -1007,7 +1007,6 @@ class DataXceiver extends Receiver implements Runnable {
     updateCurrentThreadName("Replacing block " + block + " from " + delHint);
 
     /* read header */
-    block.setNumBytes(dataXceiverServer.estimateBlockSize);
     if (datanode.isBlockTokenEnabled) {
       try {
         datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
@@ -1039,73 +1038,83 @@ class DataXceiver extends Receiver implements Runnable {
     DataOutputStream replyOut = new DataOutputStream(getOutputStream());
     boolean IoeDuringCopyBlockOperation = false;
     try {
-      // get the output stream to the proxy
-      final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Connecting to datanode " + dnAddr);
-      }
-      InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
-      proxySock = datanode.newSocket();
-      NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
-      proxySock.setSoTimeout(dnConf.socketTimeout);
-
-      OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
-          dnConf.socketWriteTimeout);
-      InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
-      DataEncryptionKeyFactory keyFactory =
-        datanode.getDataEncryptionKeyFactoryForBlock(block);
-      IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
-        unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
-      unbufProxyOut = saslStreams.out;
-      unbufProxyIn = saslStreams.in;
-      
-      proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
-          HdfsConstants.SMALL_BUFFER_SIZE));
-      proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
-          HdfsConstants.IO_FILE_BUFFER_SIZE));
-
-      /* send request to the proxy */
-      IoeDuringCopyBlockOperation = true;
-      new Sender(proxyOut).copyBlock(block, blockToken);
-      IoeDuringCopyBlockOperation = false;
-
-      // receive the response from the proxy
-      
-      BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
-          PBHelper.vintPrefixed(proxyReply));
-
-      if (copyResponse.getStatus() != SUCCESS) {
-        if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
+      // Move the block to different storage in the same datanode
+      if (proxySource.equals(datanode.getDatanodeId())) {
+        ReplicaInfo oldReplica = datanode.data.moveBlockAcrossStorage(block,
+            storageType);
+        if (oldReplica != null) {
+          LOG.info("Moved " + block + " from StorageType "
+              + oldReplica.getVolume().getStorageType() + " to " + storageType);
+        }
+      } else {
+        block.setNumBytes(dataXceiverServer.estimateBlockSize);
+        // get the output stream to the proxy
+        final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Connecting to datanode " + dnAddr);
+        }
+        InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
+        proxySock = datanode.newSocket();
+        NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
+        proxySock.setSoTimeout(dnConf.socketTimeout);
+        
+        OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
+            dnConf.socketWriteTimeout);
+        InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
+        DataEncryptionKeyFactory keyFactory =
+            datanode.getDataEncryptionKeyFactoryForBlock(block);
+        IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+            unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+        unbufProxyOut = saslStreams.out;
+        unbufProxyIn = saslStreams.in;
+        
+        proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, 
+            HdfsConstants.SMALL_BUFFER_SIZE));
+        proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
+        /* send request to the proxy */
+        IoeDuringCopyBlockOperation = true;
+        new Sender(proxyOut).copyBlock(block, blockToken);
+        IoeDuringCopyBlockOperation = false;
+        
+        // receive the response from the proxy
+        
+        BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+            PBHelper.vintPrefixed(proxyReply));
+        
+        if (copyResponse.getStatus() != SUCCESS) {
+          if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
+            throw new IOException("Copy block " + block + " from "
+                + proxySock.getRemoteSocketAddress()
+                + " failed due to access token error");
+          }
           throw new IOException("Copy block " + block + " from "
-              + proxySock.getRemoteSocketAddress()
-              + " failed due to access token error");
+              + proxySock.getRemoteSocketAddress() + " failed");
         }
-        throw new IOException("Copy block " + block + " from "
-            + proxySock.getRemoteSocketAddress() + " failed");
+        
+        // get checksum info about the block we're copying
+        ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+        DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+            checksumInfo.getChecksum());
+        // open a block receiver and check if the block does not exist
+        blockReceiver = new BlockReceiver(block, storageType,
+            proxyReply, proxySock.getRemoteSocketAddress().toString(),
+            proxySock.getLocalSocketAddress().toString(),
+            null, 0, 0, 0, "", null, datanode, remoteChecksum,
+            CachingStrategy.newDropBehind(), false);
+        
+        // receive a block
+        blockReceiver.receiveBlock(null, null, replyOut, null, 
+            dataXceiverServer.balanceThrottler, null, true);
+        
+        // notify name node
+        datanode.notifyNamenodeReceivedBlock(
+            block, delHint, blockReceiver.getStorageUuid());
+        
+        LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+            + ", delHint=" + delHint);
       }
-      
-      // get checksum info about the block we're copying
-      ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
-      DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
-          checksumInfo.getChecksum());
-      // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(block, storageType,
-          proxyReply, proxySock.getRemoteSocketAddress().toString(),
-          proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode, remoteChecksum,
-          CachingStrategy.newDropBehind(), false);
-
-      // receive a block
-      blockReceiver.receiveBlock(null, null, replyOut, null, 
-          dataXceiverServer.balanceThrottler, null, true);
-                    
-      // notify name node
-      datanode.notifyNamenodeReceivedBlock(
-          block, delHint, blockReceiver.getStorageUuid());
-
-      LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
-          + ", delHint=" + delHint);
-      
     } catch (IOException ioe) {
       opStatus = ERROR;
       errMsg = "opReplaceBlock " + block + " received exception " + ioe; 
@@ -1117,7 +1126,7 @@ class DataXceiver extends Receiver implements Runnable {
       throw ioe;
     } finally {
       // receive the last byte that indicates the proxy released its thread resource
-      if (opStatus == SUCCESS) {
+      if (opStatus == SUCCESS && proxyReply != null) {
         try {
           proxyReply.readChar();
         } catch (IOException ignored) {

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
@@ -508,4 +509,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
     * Callback from RamDiskAsyncLazyPersistService upon async lazy persist task fail
     */
    public void onFailLazyPersist(String bpId, long blockId);
+
+    /**
+     * Move block from one storage to another storage
+     */
+    public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
+        StorageType targetStorageType) throws IOException;
 }

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

@@ -157,6 +157,10 @@ class BlockPoolSlice {
     return rbwDir;
   }
 
+  File getTmpDir() {
+    return tmpDir;
+  }
+
   /** Run DU on local drives.  It must be synchronized from caller. */
   void decDfsUsed(long value) {
     dfsUsage.decDfsUsed(value);

+ 104 - 30
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -663,13 +663,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
    * @return the new meta and block files.
    * @throws IOException
    */
-  static File[] copyBlockFiles(long blockId, long genStamp,
-                               File srcMeta, File srcFile, File destRoot)
+  static File[] copyBlockFiles(long blockId, long genStamp, File srcMeta,
+      File srcFile, File destRoot, boolean calculateChecksum)
       throws IOException {
     final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
     final File dstFile = new File(destDir, srcFile.getName());
     final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
-    computeChecksum(srcMeta, dstMeta, srcFile);
+    if (calculateChecksum) {
+      computeChecksum(srcMeta, dstMeta, srcFile);
+    } else {
+      try {
+        Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
+      } catch (IOException e) {
+        throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
+      }
+    }
 
     try {
       Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
@@ -677,13 +685,72 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Copied " + srcMeta + " to " + dstMeta +
-          " and calculated checksum");
-      LOG.debug("Copied " + srcFile + " to " + dstFile);
+      if (calculateChecksum) {
+        LOG.debug("Copied " + srcMeta + " to " + dstMeta
+            + " and calculated checksum");
+      } else {
+        LOG.debug("Copied " + srcFile + " to " + dstFile);
+      }
     }
     return new File[] {dstMeta, dstFile};
   }
 
+  /**
+   * Move block files from one storage to another storage.
+   * @return Returns the Old replicaInfo
+   * @throws IOException
+   */
+  @Override
+  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+      StorageType targetStorageType) throws IOException {
+    ReplicaInfo replicaInfo = getReplicaInfo(block);
+    if (replicaInfo.getState() != ReplicaState.FINALIZED) {
+      throw new ReplicaNotFoundException(
+          ReplicaNotFoundException.UNFINALIZED_REPLICA + block);
+    }
+    if (replicaInfo.getNumBytes() != block.getNumBytes()) {
+      throw new IOException("Corrupted replica " + replicaInfo
+          + " with a length of " + replicaInfo.getNumBytes()
+          + " expected length is " + block.getNumBytes());
+    }
+    if (replicaInfo.getVolume().getStorageType() == targetStorageType) {
+      throw new ReplicaAlreadyExistsException("Replica " + replicaInfo
+          + " already exists on storage " + targetStorageType);
+    }
+
+    if (replicaInfo.isOnTransientStorage()) {
+      // Block movement from RAM_DISK will be done by LazyPersist mechanism
+      throw new IOException("Replica " + replicaInfo
+          + " cannot be moved from storageType : "
+          + replicaInfo.getVolume().getStorageType());
+    }
+
+    FsVolumeImpl targetVolume = volumes.getNextVolume(targetStorageType,
+        block.getNumBytes());
+    File oldBlockFile = replicaInfo.getBlockFile();
+    File oldMetaFile = replicaInfo.getMetaFile();
+
+    // Copy files to temp dir first
+    File[] blockFiles = copyBlockFiles(block.getBlockId(),
+        block.getGenerationStamp(), oldMetaFile, oldBlockFile,
+        targetVolume.getTmpDir(block.getBlockPoolId()),
+        replicaInfo.isOnTransientStorage());
+
+    ReplicaInfo newReplicaInfo = new ReplicaInPipeline(
+        replicaInfo.getBlockId(), replicaInfo.getGenerationStamp(),
+        targetVolume, blockFiles[0].getParentFile(), 0);
+    newReplicaInfo.setNumBytes(blockFiles[1].length());
+    // Finalize the copied files
+    newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo);
+
+    removeOldReplica(replicaInfo, newReplicaInfo, oldBlockFile, oldMetaFile,
+        oldBlockFile.length(), oldMetaFile.length(), block.getBlockPoolId());
+
+    // Replace the old block if any to reschedule the scanning.
+    datanode.getBlockScanner().addBlock(block);
+    return replicaInfo;
+  }
+
   /**
    * Compute and store the checksum for a block file that does not already have
    * its checksum computed.
@@ -2442,6 +2509,35 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
+  private void removeOldReplica(ReplicaInfo replicaInfo,
+      ReplicaInfo newReplicaInfo, File blockFile, File metaFile,
+      long blockFileUsed, long metaFileUsed, final String bpid) {
+    // Before deleting the files from old storage we must notify the
+    // NN that the files are on the new storage. Else a blockReport from
+    // the transient storage might cause the NN to think the blocks are lost.
+    // Replicas must be evicted from client short-circuit caches, because the
+    // storage will no longer be same, and thus will require validating
+    // checksum.  This also stops a client from holding file descriptors,
+    // which would prevent the OS from reclaiming the memory.
+    ExtendedBlock extendedBlock =
+        new ExtendedBlock(bpid, newReplicaInfo);
+    datanode.getShortCircuitRegistry().processBlockInvalidation(
+        ExtendedBlockId.fromExtendedBlock(extendedBlock));
+    datanode.notifyNamenodeReceivedBlock(
+        extendedBlock, null, newReplicaInfo.getStorageUuid());
+
+    // Remove the old replicas
+    if (blockFile.delete() || !blockFile.exists()) {
+      ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+      if (metaFile.delete() || !metaFile.exists()) {
+        ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+      }
+    }
+
+    // If deletion failed then the directory scanner will cleanup the blocks
+    // eventually.
+  }
+
   class LazyWriter implements Runnable {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
@@ -2601,30 +2697,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           }
         }
 
-        // Before deleting the files from transient storage we must notify the
-        // NN that the files are on the new storage. Else a blockReport from
-        // the transient storage might cause the NN to think the blocks are lost.
-        // Replicas must be evicted from client short-circuit caches, because the
-        // storage will no longer be transient, and thus will require validating
-        // checksum.  This also stops a client from holding file descriptors,
-        // which would prevent the OS from reclaiming the memory.
-        ExtendedBlock extendedBlock =
-            new ExtendedBlock(bpid, newReplicaInfo);
-        datanode.getShortCircuitRegistry().processBlockInvalidation(
-            ExtendedBlockId.fromExtendedBlock(extendedBlock));
-        datanode.notifyNamenodeReceivedBlock(
-            extendedBlock, null, newReplicaInfo.getStorageUuid());
-
-        // Remove the old replicas from transient storage.
-        if (blockFile.delete() || !blockFile.exists()) {
-          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
-          if (metaFile.delete() || !metaFile.exists()) {
-            ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
-          }
-        }
-
-        // If deletion failed then the directory scanner will cleanup the blocks
-        // eventually.
+        removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
+            blockFileUsed, metaFileUsed, bpid);
       }
     }
 

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -129,6 +129,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
     return getBlockPoolSlice(bpid).getLazypersistDir();
   }
 
+  File getTmpDir(String bpid) throws IOException {
+    return getBlockPoolSlice(bpid).getTmpDir();
+  }
+
   void decDfsUsed(String bpid, long value) {
     synchronized(dataset) {
       BlockPoolSlice bp = bpSlices.get(bpid);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java

@@ -232,7 +232,7 @@ class RamDiskAsyncLazyPersistService {
       try {
         // No FsDatasetImpl lock for the file copy
         File targetFiles[] = FsDatasetImpl.copyBlockFiles(
-            blockId, genStamp, metaFile, blockFile, lazyPersistDir);
+            blockId, genStamp, metaFile, blockFile, lazyPersistDir, true);
 
         // Lock FsDataSetImpl during onCompleteLazyPersist callback
         datanode.getFSDataset().onCompleteLazyPersist(bpId, blockId,

+ 27 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java

@@ -86,8 +86,8 @@ public class Mover {
       return get(sources, ml);
     }
 
-    private StorageGroup getTarget(MLocation ml) {
-      return get(targets, ml);
+    private StorageGroup getTarget(String uuid, StorageType storageType) {
+      return targets.get(uuid, storageType);
     }
 
     private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) {
@@ -387,6 +387,11 @@ public class Mover {
 
     boolean scheduleMoveReplica(DBlock db, Source source,
         List<StorageType> targetTypes) {
+      // Match storage on the same node
+      if (chooseTargetInSameNode(db, source, targetTypes)) {
+        return true;
+      }
+
       if (dispatcher.getCluster().isNodeGroupAware()) {
         if (chooseTarget(db, source, targetTypes, Matcher.SAME_NODE_GROUP)) {
           return true;
@@ -401,6 +406,26 @@ public class Mover {
       return chooseTarget(db, source, targetTypes, Matcher.ANY_OTHER);
     }
 
+    /**
+     * Choose the target storage within same Datanode if possible.
+     */
+    boolean chooseTargetInSameNode(DBlock db, Source source,
+        List<StorageType> targetTypes) {
+      for (StorageType t : targetTypes) {
+        StorageGroup target = storages.getTarget(source.getDatanodeInfo()
+            .getDatanodeUuid(), t);
+        if (target == null) {
+          continue;
+        }
+        final PendingMove pm = source.addPendingMove(db, target);
+        if (pm != null) {
+          dispatcher.executePendingMove(pm);
+          return true;
+        }
+      }
+      return false;
+    }
+
     boolean chooseTarget(DBlock db, Source source,
         List<StorageType> targetTypes, Matcher matcher) {
       final NetworkTopology cluster = dispatcher.getCluster(); 

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java

@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Assert;
@@ -75,7 +76,8 @@ public class TestBlockInfo {
     }
 
     // Try to move one of the blocks to a different storage.
-    boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]);
+    boolean added =
+        storage2.addBlock(blockInfos[NUM_BLOCKS / 2]) == AddBlockResult.ADDED;
     Assert.assertThat(added, is(false));
     Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
   }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java

@@ -25,6 +25,7 @@ import java.util.ArrayList;
 
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.junit.Test;
 
@@ -61,18 +62,17 @@ public class TestDatanodeDescriptor {
     BlockInfo blk1 = new BlockInfo(new Block(2L), (short) 2);
     DatanodeStorageInfo[] storages = dd.getStorageInfos();
     assertTrue(storages.length > 0);
-    final String storageID = storages[0].getStorageID();
     // add first block
-    assertTrue(storages[0].addBlock(blk));
+    assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED);
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
     assertFalse(dd.removeBlock(blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
-    assertFalse(storages[0].addBlock(blk));
+    assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED);
     assertEquals(1, dd.numBlocks());
     // add second block
-    assertTrue(storages[0].addBlock(blk1));
+    assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED);
     assertEquals(2, dd.numBlocks());
     // remove first block
     assertTrue(dd.removeBlock(blk));

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -1260,5 +1260,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   public void onFailLazyPersist(String bpId, long blockId) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
+      StorageType targetStorageType) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
 }
 

+ 82 - 19
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java

@@ -17,15 +17,14 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -40,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StorageType;
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -200,7 +201,51 @@ public class TestBlockReplacement {
       cluster.shutdown();
     }
   }
-  
+
+  @Test
+  public void testBlockMoveAcrossStorageInSameNode() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    // create only one datanode in the cluster to verify movement within
+    // datanode.
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).storageTypes(
+            new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
+            .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final Path file = new Path("/testBlockMoveAcrossStorageInSameNode/file");
+      DFSTestUtil.createFile(dfs, file, 1024, (short) 1, 1024);
+      LocatedBlocks locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0);
+      // get the current 
+      LocatedBlock locatedBlock = locatedBlocks.get(0);
+      ExtendedBlock block = locatedBlock.getBlock();
+      DatanodeInfo[] locations = locatedBlock.getLocations();
+      assertEquals(1, locations.length);
+      StorageType[] storageTypes = locatedBlock.getStorageTypes();
+      // current block should be written to DISK
+      assertTrue(storageTypes[0] == StorageType.DISK);
+      
+      DatanodeInfo source = locations[0];
+      // move block to ARCHIVE by using same DataNodeInfo for source, proxy and
+      // destination so that movement happens within datanode 
+      assertTrue(replaceBlock(block, source, source, source,
+          StorageType.ARCHIVE));
+      
+      // wait till namenode notified
+      Thread.sleep(3000);
+      locatedBlocks = dfs.getClient().getLocatedBlocks(file.toString(), 0);
+      // get the current 
+      locatedBlock = locatedBlocks.get(0);
+      assertEquals("Storage should be only one", 1,
+          locatedBlock.getLocations().length);
+      assertTrue("Block should be moved to ARCHIVE", locatedBlock
+          .getStorageTypes()[0] == StorageType.ARCHIVE);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /* check if file's blocks have expected number of replicas,
    * and exist at all of includeNodes
    */
@@ -259,24 +304,42 @@ public class TestBlockReplacement {
    */
   private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
+    return replaceBlock(block, source, sourceProxy, destination,
+        StorageType.DEFAULT);
+  }
+
+  /*
+   * Replace block
+   */
+  private boolean replaceBlock(
+      ExtendedBlock block,
+      DatanodeInfo source,
+      DatanodeInfo sourceProxy,
+      DatanodeInfo destination,
+      StorageType targetStorageType) throws IOException, SocketException {
     Socket sock = new Socket();
-    sock.connect(NetUtils.createSocketAddr(
-        destination.getXferAddr()), HdfsServerConstants.READ_TIMEOUT); 
-    sock.setKeepAlive(true);
-    // sendRequest
-    DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    new Sender(out).replaceBlock(block, StorageType.DEFAULT,
-        BlockTokenSecretManager.DUMMY_TOKEN,
-        source.getDatanodeUuid(), sourceProxy);
-    out.flush();
-    // receiveResponse
-    DataInputStream reply = new DataInputStream(sock.getInputStream());
+    try {
+      sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
+          HdfsServerConstants.READ_TIMEOUT);
+      sock.setKeepAlive(true);
+      // sendRequest
+      DataOutputStream out = new DataOutputStream(sock.getOutputStream());
+      new Sender(out).replaceBlock(block, targetStorageType,
+          BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
+          sourceProxy);
+      out.flush();
+      // receiveResponse
+      DataInputStream reply = new DataInputStream(sock.getInputStream());
 
-    BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply);
-    while (proto.getStatus() == Status.IN_PROGRESS) {
-      proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+      BlockOpResponseProto proto =
+          BlockOpResponseProto.parseDelimitedFrom(reply);
+      while (proto.getStatus() == Status.IN_PROGRESS) {
+        proto = BlockOpResponseProto.parseDelimitedFrom(reply);
+      }
+      return proto.getStatus() == Status.SUCCESS;
+    } finally {
+      sock.close();
     }
-    return proto.getStatus() == Status.SUCCESS;
   }
 
   /**

+ 47 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -79,6 +79,52 @@ public class TestMover {
     }
   }
 
+  @Test
+  public void testScheduleBlockWithinSameNode() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3)
+        .storageTypes(
+            new StorageType[] { StorageType.DISK, StorageType.ARCHIVE })
+        .build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String file = "/testScheduleWithinSameNode/file";
+      Path dir = new Path("/testScheduleWithinSameNode");
+      dfs.mkdirs(dir);
+      // write to DISK
+      dfs.setStoragePolicy(dir, "HOT");
+      {
+        final FSDataOutputStream out = dfs.create(new Path(file));
+        out.writeChars("testScheduleWithinSameNode");
+        out.close();
+      }
+
+      //verify before movement
+      LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      StorageType[] storageTypes = lb.getStorageTypes();
+      for (StorageType storageType : storageTypes) {
+        Assert.assertTrue(StorageType.DISK == storageType);
+      }
+      // move to ARCHIVE
+      dfs.setStoragePolicy(dir, "COLD");
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] { "-p", dir.toString() });
+      Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
+
+      // Wait till namenode notified
+      Thread.sleep(3000);
+      lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      storageTypes = lb.getStorageTypes();
+      for (StorageType storageType : storageTypes) {
+        Assert.assertTrue(StorageType.ARCHIVE == storageType);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   private void checkMovePaths(List<Path> actual, Path... expected) {
     Assert.assertEquals(expected.length, actual.size());
     for (Path p : expected) {