Sfoglia il codice sorgente

HDFS-6133. Add a feature for replica pinning so that a pinned replica will not be moved by Balancer/Mover. Contributed by zhaoyunjiong

Tsz-Wo Nicholas Sze 10 anni fa
parent
commit
085b1e293f
21 ha cambiato i file con 271 aggiunte e 36 eliminazioni
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 4 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 21 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
  4. 4 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
  5. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
  6. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
  7. 6 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
  8. 21 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  9. 11 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
  11. 26 8
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
  12. 13 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
  13. 40 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  14. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
  15. 6 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  16. 22 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  17. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
  18. 60 11
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
  19. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
  21. 9 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -332,6 +332,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
     HDFS-7720. Quota by Storage Type API, tools and ClientNameNode Protocol
     changes. (Xiaoyu Yao via Arpit Agarwal)
     changes. (Xiaoyu Yao via Arpit Agarwal)
 
 
+    HDFS-6133. Add a feature for replica pinning so that a pinned replica
+    will not be moved by Balancer/Mover.  (zhaoyunjiong via szetszwo)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -778,4 +778,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   // 10 days
   // 10 days
   public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
   public static final long DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT =
       TimeUnit.DAYS.toMillis(10);
       TimeUnit.DAYS.toMillis(10);
+  public static final String DFS_DATANODE_BLOCK_PINNING_ENABLED = 
+    "dfs.datanode.block-pinning.enabled";
+  public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT =
+    false;
 }
 }

+ 21 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -1443,11 +1443,13 @@ public class DFSOutputStream extends FSOutputSummer
           ExtendedBlock blockCopy = new ExtendedBlock(block);
           ExtendedBlock blockCopy = new ExtendedBlock(block);
           blockCopy.setNumBytes(blockSize);
           blockCopy.setNumBytes(blockSize);
 
 
+          boolean[] targetPinnings = getPinnings(nodes);
           // send the request
           // send the request
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS,
               nodes.length, block.getNumBytes(), bytesSent, newGS,
-              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
+              checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
+            (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
   
   
           // receive ack for connect
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1535,6 +1537,24 @@ public class DFSOutputStream extends FSOutputSummer
       }
       }
     }
     }
 
 
+    private boolean[] getPinnings(DatanodeInfo[] nodes) {
+      if (favoredNodes == null) {
+        return null;
+      } else {
+        boolean[] pinnings = new boolean[nodes.length];
+        for (int i = 0; i < nodes.length; i++) {
+          pinnings[i] = false;
+          for (int j = 0; j < favoredNodes.length; j++) {
+            if (nodes[i].getXferAddrWithHostname().equals(favoredNodes[j])) {
+              pinnings[i] = true;
+              break;
+            }
+          }
+        }
+        return pinnings;
+      }
+    }
+
     private LocatedBlock locateFollowingBlock(long start,
     private LocatedBlock locateFollowingBlock(long start,
         DatanodeInfo[] excludedNodes)  throws IOException {
         DatanodeInfo[] excludedNodes)  throws IOException {
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;

+ 4 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -352,9 +352,10 @@ public class DistributedFileSystem extends FileSystem {
    * Progressable)} with the addition of favoredNodes that is a hint to 
    * Progressable)} with the addition of favoredNodes that is a hint to 
    * where the namenode should place the file blocks.
    * where the namenode should place the file blocks.
    * The favored nodes hint is not persisted in HDFS. Hence it may be honored
    * The favored nodes hint is not persisted in HDFS. Hence it may be honored
-   * at the creation time only. HDFS could move the blocks during balancing or
-   * replication, to move the blocks from favored nodes. A value of null means
-   * no favored nodes for this create
+   * at the creation time only. And with favored nodes, blocks will be pinned
+   * on the datanodes to prevent balancing move the block. HDFS could move the
+   * blocks during replication, to move the blocks from favored nodes. A value
+   * of null means no favored nodes for this create
    */
    */
   public HdfsDataOutputStream create(final Path f,
   public HdfsDataOutputStream create(final Path f,
       final FsPermission permission, final boolean overwrite,
       final FsPermission permission, final boolean overwrite,

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java

@@ -92,6 +92,8 @@ public interface DataTransferProtocol {
    * @param minBytesRcvd minimum number of bytes received.
    * @param minBytesRcvd minimum number of bytes received.
    * @param maxBytesRcvd maximum number of bytes received.
    * @param maxBytesRcvd maximum number of bytes received.
    * @param latestGenerationStamp the latest generation stamp of the block.
    * @param latestGenerationStamp the latest generation stamp of the block.
+   * @param pinning whether to pin the block, so Balancer won't move it.
+   * @param targetPinnings whether to pin the block on target datanode
    */
    */
   public void writeBlock(final ExtendedBlock blk,
   public void writeBlock(final ExtendedBlock blk,
       final StorageType storageType, 
       final StorageType storageType, 
@@ -107,7 +109,9 @@ public interface DataTransferProtocol {
       final long latestGenerationStamp,
       final long latestGenerationStamp,
       final DataChecksum requestedChecksum,
       final DataChecksum requestedChecksum,
       final CachingStrategy cachingStrategy,
       final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException;
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException;
   /**
   /**
    * Transfer a block to another datanode.
    * Transfer a block to another datanode.
    * The block stage must be
    * The block stage must be

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java

@@ -149,7 +149,9 @@ public abstract class Receiver implements DataTransferProtocol {
           (proto.hasCachingStrategy() ?
           (proto.hasCachingStrategy() ?
               getCachingStrategy(proto.getCachingStrategy()) :
               getCachingStrategy(proto.getCachingStrategy()) :
             CachingStrategy.newDefaultStrategy()),
             CachingStrategy.newDefaultStrategy()),
-          (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false));
+          (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
+          (proto.hasPinning() ? proto.getPinning(): false),
+          (PBHelper.convertBooleanList(proto.getTargetPinningsList())));
     } finally {
     } finally {
      if (traceScope != null) traceScope.close();
      if (traceScope != null) traceScope.close();
     }
     }

+ 6 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java

@@ -129,7 +129,9 @@ public class Sender implements DataTransferProtocol {
       final long latestGenerationStamp,
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       DataChecksum requestedChecksum,
       final CachingStrategy cachingStrategy,
       final CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
     ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
         blk, clientName, blockToken);
         blk, clientName, blockToken);
     
     
@@ -148,7 +150,9 @@ public class Sender implements DataTransferProtocol {
       .setLatestGenerationStamp(latestGenerationStamp)
       .setLatestGenerationStamp(latestGenerationStamp)
       .setRequestedChecksum(checksumProto)
       .setRequestedChecksum(checksumProto)
       .setCachingStrategy(getCachingStrategy(cachingStrategy))
       .setCachingStrategy(getCachingStrategy(cachingStrategy))
-      .setAllowLazyPersist(allowLazyPersist);
+      .setAllowLazyPersist(allowLazyPersist)
+      .setPinning(pinning)
+      .addAllTargetPinnings(PBHelper.convert(targetPinnings, 1));
     
     
     if (source != null) {
     if (source != null) {
       proto.setSource(PBHelper.convertDatanodeInfo(source));
       proto.setSource(PBHelper.convertDatanodeInfo(source));

+ 21 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -2960,4 +2960,25 @@ public class PBHelper {
         ezKeyVersionName);
         ezKeyVersionName);
   }
   }
 
 
+  public static List<Boolean> convert(boolean[] targetPinnings, int idx) {
+    List<Boolean> pinnings = new ArrayList<Boolean>();
+    if (targetPinnings == null) {
+      pinnings.add(Boolean.FALSE);
+    } else {
+      for (; idx < targetPinnings.length; ++idx) {
+        pinnings.add(Boolean.valueOf(targetPinnings[idx]));
+      }
+    }
+    return pinnings;
+  }
+
+  public static boolean[] convertBooleanList(
+    List<Boolean> targetPinningsList) {
+    final boolean[] targetPinnings = new boolean[targetPinningsList.size()];
+    for (int i = 0; i < targetPinningsList.size(); i++) {
+      targetPinnings[i] = targetPinningsList.get(i);
+    }
+    return targetPinnings;
+  }
+
 }
 }

+ 11 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java

@@ -132,6 +132,8 @@ class BlockReceiver implements Closeable {
   private long lastResponseTime = 0;
   private long lastResponseTime = 0;
   private boolean isReplaceBlock = false;
   private boolean isReplaceBlock = false;
   private DataOutputStream replyOut = null;
   private DataOutputStream replyOut = null;
+  
+  private boolean pinning;
 
 
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
   BlockReceiver(final ExtendedBlock block, final StorageType storageType,
       final DataInputStream in,
       final DataInputStream in,
@@ -141,7 +143,8 @@ class BlockReceiver implements Closeable {
       final String clientname, final DatanodeInfo srcDataNode,
       final String clientname, final DatanodeInfo srcDataNode,
       final DataNode datanode, DataChecksum requestedChecksum,
       final DataNode datanode, DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning) throws IOException {
     try{
     try{
       this.block = block;
       this.block = block;
       this.in = in;
       this.in = in;
@@ -165,12 +168,14 @@ class BlockReceiver implements Closeable {
       this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
       this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
           || stage == BlockConstructionStage.TRANSFER_FINALIZED;
           || stage == BlockConstructionStage.TRANSFER_FINALIZED;
 
 
+      this.pinning = pinning;
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(getClass().getSimpleName() + ": " + block
         LOG.debug(getClass().getSimpleName() + ": " + block
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isClient  =" + isClient + ", clientname=" + clientname
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
             + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
             + "\n  cachingStrategy = " + cachingStrategy
             + "\n  cachingStrategy = " + cachingStrategy
+            + "\n  pinning=" + pinning
             );
             );
       }
       }
 
 
@@ -1279,6 +1284,11 @@ class BlockReceiver implements Closeable {
           : 0;
           : 0;
       block.setNumBytes(replicaInfo.getNumBytes());
       block.setNumBytes(replicaInfo.getNumBytes());
       datanode.data.finalizeBlock(block);
       datanode.data.finalizeBlock(block);
+      
+      if (pinning) {
+        datanode.data.setPinning(block);
+      }
+      
       datanode.closeBlock(
       datanode.closeBlock(
           block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
           block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid());
       if (ClientTraceLog.isInfoEnabled() && isClient) {
       if (ClientTraceLog.isInfoEnabled() && isClient) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -2068,7 +2068,7 @@ public class DataNode extends ReconfigurableBase
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
         new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
             clientname, targets, targetStorageTypes, srcNode,
             clientname, targets, targetStorageTypes, srcNode,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
             stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy,
-            false);
+            false, false, null);
 
 
         // send data & checksum
         // send data & checksum
         blockSender.sendBlock(out, unbufOut, null);
         blockSender.sendBlock(out, unbufOut, null);

+ 26 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -581,7 +581,9 @@ class DataXceiver extends Receiver implements Runnable {
       final long latestGenerationStamp,
       final long latestGenerationStamp,
       DataChecksum requestedChecksum,
       DataChecksum requestedChecksum,
       CachingStrategy cachingStrategy,
       CachingStrategy cachingStrategy,
-      final boolean allowLazyPersist) throws IOException {
+      final boolean allowLazyPersist,
+      final boolean pinning,
+      final boolean[] targetPinnings) throws IOException {
     previousOpClientName = clientname;
     previousOpClientName = clientname;
     updateCurrentThreadName("Receiving block " + block);
     updateCurrentThreadName("Receiving block " + block);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isDatanode = clientname.length() == 0;
@@ -594,14 +596,14 @@ class DataXceiver extends Receiver implements Runnable {
       throw new IOException(stage + " does not support multiple targets "
       throw new IOException(stage + " does not support multiple targets "
           + Arrays.asList(targets));
           + Arrays.asList(targets));
     }
     }
-
+    
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
       LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
       		+ "\n  block  =" + block + ", newGs=" + latestGenerationStamp
       		+ "\n  block  =" + block + ", newGs=" + latestGenerationStamp
       		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
       		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
           + "\n  targets=" + Arrays.asList(targets)
           + "\n  targets=" + Arrays.asList(targets)
           + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
           + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
-          );
+          + ", pinning=" + pinning);
       LOG.debug("isDatanode=" + isDatanode
       LOG.debug("isDatanode=" + isDatanode
           + ", isClient=" + isClient
           + ", isClient=" + isClient
           + ", isTransfer=" + isTransfer);
           + ", isTransfer=" + isTransfer);
@@ -643,7 +645,7 @@ class DataXceiver extends Receiver implements Runnable {
             peer.getLocalAddressString(),
             peer.getLocalAddressString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
             clientname, srcDataNode, datanode, requestedChecksum,
             clientname, srcDataNode, datanode, requestedChecksum,
-            cachingStrategy, allowLazyPersist);
+            cachingStrategy, allowLazyPersist, pinning);
 
 
         storageUuid = blockReceiver.getStorageUuid();
         storageUuid = blockReceiver.getStorageUuid();
       } else {
       } else {
@@ -686,10 +688,19 @@ class DataXceiver extends Receiver implements Runnable {
           mirrorIn = new DataInputStream(unbufMirrorIn);
           mirrorIn = new DataInputStream(unbufMirrorIn);
 
 
           // Do not propagate allowLazyPersist to downstream DataNodes.
           // Do not propagate allowLazyPersist to downstream DataNodes.
-          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+          if (targetPinnings != null && targetPinnings.length > 0) {
+            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               blockToken, clientname, targets, targetStorageTypes, srcDataNode,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
               stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-              latestGenerationStamp, requestedChecksum, cachingStrategy, false);
+              latestGenerationStamp, requestedChecksum, cachingStrategy,
+              false, targetPinnings[0], targetPinnings);
+          } else {
+            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+              blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+              stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+              latestGenerationStamp, requestedChecksum, cachingStrategy,
+              false, false, targetPinnings);
+          }
 
 
           mirrorOut.flush();
           mirrorOut.flush();
 
 
@@ -949,7 +960,14 @@ class DataXceiver extends Receiver implements Runnable {
       }
       }
 
 
     }
     }
-
+    
+    if (datanode.data.getPinning(block)) {
+      String msg = "Not able to copy block " + block.getBlockId() + " " +
+          "to " + peer.getRemoteAddressString() + " because it's pinned ";
+      LOG.info(msg);
+      sendResponse(ERROR, msg);
+    }
+    
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
       String msg = "Not able to copy block " + block.getBlockId() + " " +
       String msg = "Not able to copy block " + block.getBlockId() + " " +
           "to " + peer.getRemoteAddressString() + " because threads " +
           "to " + peer.getRemoteAddressString() + " because threads " +
@@ -1109,7 +1127,7 @@ class DataXceiver extends Receiver implements Runnable {
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxyReply, proxySock.getRemoteSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             proxySock.getLocalSocketAddress().toString(),
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
             null, 0, 0, 0, "", null, datanode, remoteChecksum,
-            CachingStrategy.newDropBehind(), false);
+            CachingStrategy.newDropBehind(), false, false);
         
         
         // receive a block
         // receive a block
         blockReceiver.receiveBlock(null, null, replyOut, null, 
         blockReceiver.receiveBlock(null, null, replyOut, null, 

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

@@ -522,4 +522,17 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
      */
      */
    public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
    public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
         StorageType targetStorageType) throws IOException;
         StorageType targetStorageType) throws IOException;
+
+  /**
+   * Set a block to be pinned on this datanode so that it cannot be moved
+   * by Balancer/Mover.
+   *
+   * It is a no-op when dfs.datanode.block-pinning.enabled is set to false.
+   */
+  public void setPinning(ExtendedBlock block) throws IOException;
+
+  /**
+   * Check whether the block was pinned
+   */
+  public boolean getPinning(ExtendedBlock block) throws IOException;
 }
 }

+ 40 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -50,6 +50,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
@@ -239,6 +243,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   // Used for synchronizing access to usage stats
   // Used for synchronizing access to usage stats
   private final Object statsLock = new Object();
   private final Object statsLock = new Object();
 
 
+  final LocalFileSystem localFS;
+
+  private boolean blockPinningEnabled;
+  
   /**
   /**
    * An FSDataset has a directory where it loads its data files.
    * An FSDataset has a directory where it loads its data files.
    */
    */
@@ -299,6 +307,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     lazyWriter = new Daemon(new LazyWriter(conf));
     lazyWriter = new Daemon(new LazyWriter(conf));
     lazyWriter.start();
     lazyWriter.start();
     registerMBean(datanode.getDatanodeUuid());
     registerMBean(datanode.getDatanodeUuid());
+    localFS = FileSystem.getLocal(conf);
+    blockPinningEnabled = conf.getBoolean(
+      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED,
+      DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT);
   }
   }
 
 
   private void addVolume(Collection<StorageLocation> dataLocations,
   private void addVolume(Collection<StorageLocation> dataLocations,
@@ -2842,5 +2854,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       shouldRun = false;
       shouldRun = false;
     }
     }
   }
   }
+  
+  @Override
+  public void setPinning(ExtendedBlock block) throws IOException {
+    if (!blockPinningEnabled) {
+      return;
+    }
+
+    File f = getBlockFile(block);
+    Path p = new Path(f.getAbsolutePath());
+    
+    FsPermission oldPermission = localFS.getFileStatus(
+        new Path(f.getAbsolutePath())).getPermission();
+    //sticky bit is used for pinning purpose
+    FsPermission permission = new FsPermission(oldPermission.getUserAction(),
+        oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
+    localFS.setPermission(p, permission);
+  }
+  
+  @Override
+  public boolean getPinning(ExtendedBlock block) throws IOException {
+    if (!blockPinningEnabled) {
+      return  false;
+    }
+    File f = getBlockFile(block);
+        
+    FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
+    return fss.getPermission().getStickyBit();
+  }
 }
 }
 
 

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto

@@ -123,6 +123,9 @@ message OpWriteBlockProto {
    * to ignore this hint.
    * to ignore this hint.
    */
    */
   optional bool allowLazyPersist = 13 [default = false];
   optional bool allowLazyPersist = 13 [default = false];
+  //whether to pin the block, so Balancer won't move it.
+  optional bool pinning = 14 [default = false];
+  repeated bool targetPinnings = 15;
 }
 }
   
   
 message OpTransferBlockProto {
 message OpTransferBlockProto {

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -2264,4 +2264,10 @@
   </description>
   </description>
 </property>
 </property>
 
 
+  <property>
+    <name>dfs.datanode.block-pinning.enabled</name>
+    <value>false</value>
+    <description>Whether pin blocks on favored DataNode.</description>
+  </property>
+
 </configuration>
 </configuration>

+ 22 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -293,13 +293,21 @@ public class DFSTestUtil {
   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
   public static void createFile(FileSystem fs, Path fileName, int bufferLen,
       long fileLen, long blockSize, short replFactor, long seed)
       long fileLen, long blockSize, short replFactor, long seed)
       throws IOException {
       throws IOException {
-    createFile(fs, fileName, false, bufferLen, fileLen, blockSize,
-            replFactor, seed, false);
+    createFile(fs, fileName, false, bufferLen, fileLen, blockSize, replFactor,
+      seed, false);
   }
   }
 
 
   public static void createFile(FileSystem fs, Path fileName,
   public static void createFile(FileSystem fs, Path fileName,
       boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
       boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
       short replFactor, long seed, boolean flush) throws IOException {
       short replFactor, long seed, boolean flush) throws IOException {
+        createFile(fs, fileName, isLazyPersist, bufferLen, fileLen, blockSize,
+          replFactor, seed, flush, null);
+  }
+
+  public static void createFile(FileSystem fs, Path fileName,
+      boolean isLazyPersist, int bufferLen, long fileLen, long blockSize,
+      short replFactor, long seed, boolean flush,
+      InetSocketAddress[] favoredNodes) throws IOException {
   assert bufferLen > 0;
   assert bufferLen > 0;
   if (!fs.mkdirs(fileName.getParent())) {
   if (!fs.mkdirs(fileName.getParent())) {
       throw new IOException("Mkdirs failed to create " +
       throw new IOException("Mkdirs failed to create " +
@@ -312,10 +320,19 @@ public class DFSTestUtil {
     createFlags.add(LAZY_PERSIST);
     createFlags.add(LAZY_PERSIST);
   }
   }
   try {
   try {
-      out = fs.create(fileName, FsPermission.getFileDefault(), createFlags,
-        fs.getConf().getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
+    if (favoredNodes == null) {
+      out = fs.create(
+        fileName,
+        FsPermission.getFileDefault(),
+        createFlags,
+        fs.getConf().getInt(
+          CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096),
         replFactor, blockSize, null);
         replFactor, blockSize, null);
-
+    } else {
+      out = ((DistributedFileSystem) fs).create(fileName,
+        FsPermission.getDefault(), true, bufferLen, replFactor, blockSize,
+        null, favoredNodes);
+    }
       if (fileLen > 0) {
       if (fileLen > 0) {
         byte[] toWrite = new byte[bufferLen];
         byte[] toWrite = new byte[bufferLen];
         Random rb = new Random(seed);
         Random rb = new Random(seed);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

@@ -528,6 +528,6 @@ public class TestDataTransferProtocol {
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         BlockTokenSecretManager.DUMMY_TOKEN, "cl",
         new DatanodeInfo[1], new StorageType[1], null, stage,
         new DatanodeInfo[1], new StorageType[1], null, stage,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
         0, block.getNumBytes(), block.getNumBytes(), newGS,
-        checksum, CachingStrategy.newDefaultStrategy(), false);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
   }
   }
 }
 }

+ 60 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

@@ -17,12 +17,7 @@
  */
  */
 package org.apache.hadoop.hdfs.server.balancer;
 package org.apache.hadoop.hdfs.server.balancer;
 
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
 import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
 import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -33,6 +28,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.PrintWriter;
 import java.net.URI;
 import java.net.URI;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
@@ -59,12 +55,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@@ -309,6 +301,63 @@ public class TestBalancer {
     waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
     waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
   }
   }
   
   
+  /**
+   * Make sure that balancer can't move pinned blocks.
+   * If specified favoredNodes when create file, blocks will be pinned use 
+   * sticky bit.
+   * @throws Exception
+   */
+  @Test(timeout=100000)
+  public void testBalancerWithPinnedBlocks() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setBoolean(DFS_DATANODE_BLOCK_PINNING_ENABLED, true);
+    
+    long[] capacities =  new long[] { CAPACITY, CAPACITY };
+    String[] racks = { RACK0, RACK1 };
+    int numOfDatanodes = capacities.length;
+
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
+      .hosts(new String[]{"localhost", "localhost"})
+      .racks(racks).simulatedCapacities(capacities).build();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      
+      // fill up the cluster to be 80% full
+      long totalCapacity = sum(capacities);
+      long totalUsedSpace = totalCapacity * 8 / 10;
+      InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+      for (int i = 0; i < favoredNodes.length; i++) {
+        favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
+      }
+
+      DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+          totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+          (short) numOfDatanodes, 0, false, favoredNodes);
+      
+      // start up an empty node with the same capacity
+      cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
+          new long[] { CAPACITY });
+      
+      totalCapacity += CAPACITY;
+      
+      // run balancer and validate results
+      waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+      // start rebalancing
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+      
+    } finally {
+      cluster.shutdown();
+    }
+    
+  }
+  
   /**
   /**
    * Wait until balanced: each datanode gives utilization within 
    * Wait until balanced: each datanode gives utilization within 
    * BALANCE_ALLOWED_VARIANCE of average
    * BALANCE_ALLOWED_VARIANCE of average

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

@@ -127,6 +127,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     SimulatedOutputStream oStream = null;
     SimulatedOutputStream oStream = null;
     private long bytesAcked;
     private long bytesAcked;
     private long bytesRcvd;
     private long bytesRcvd;
+    private boolean pinned = false;
     BInfo(String bpid, Block b, boolean forWriting) throws IOException {
     BInfo(String bpid, Block b, boolean forWriting) throws IOException {
       theBlock = new Block(b);
       theBlock = new Block(b);
       if (theBlock.getNumBytes() < 0) {
       if (theBlock.getNumBytes() < 0) {
@@ -1285,5 +1286,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     // TODO Auto-generated method stub
     // TODO Auto-generated method stub
     return null;
     return null;
   }
   }
+  
+  @Override
+  public void setPinning(ExtendedBlock b) throws IOException {
+    blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true;
+  }
+  
+  @Override
+  public boolean getPinning(ExtendedBlock b) throws IOException {
+    return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
+  }
 }
 }
 
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

@@ -152,7 +152,7 @@ public class TestDiskError {
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         BlockTokenSecretManager.DUMMY_TOKEN, "",
         new DatanodeInfo[0], new StorageType[0], null,
         new DatanodeInfo[0], new StorageType[0], null,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
-        checksum, CachingStrategy.newDefaultStrategy(), false);
+        checksum, CachingStrategy.newDefaultStrategy(), false, false, null);
     out.flush();
     out.flush();
 
 
     // close the connection before sending the content of the block
     // close the connection before sending the content of the block

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

@@ -399,4 +399,13 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
   public long getNumBlocksFailedToUncache() {
   public long getNumBlocksFailedToUncache() {
     return 0;
     return 0;
   }
   }
+
+  @Override
+  public void setPinning(ExtendedBlock block) throws IOException {    
+  }
+
+  @Override
+  public boolean getPinning(ExtendedBlock block) throws IOException {
+    return false;
+  }
 }
 }