瀏覽代碼

HDFS-7369. Erasure coding: distribute recovery work for striped blocks to DataNode. Contributed by Zhe Zhang.

Zhe Zhang 10 年之前
父節點
當前提交
57a84c0d14

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java

@@ -86,4 +86,9 @@ public interface BlockCollection {
    * @return whether the block collection is under construction.
    * @return whether the block collection is under construction.
    */
    */
   public boolean isUnderConstruction();
   public boolean isUnderConstruction();
+
+  /**
+   * @return whether the block collection is in striping format
+   */
+  public boolean isStriped();
 }
 }

+ 208 - 82
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -534,9 +535,9 @@ public class BlockManager {
     
     
     NumberReplicas numReplicas = new NumberReplicas();
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
     // source node returned is not used
-    chooseSourceDatanode(block, containingNodes,
+    chooseSourceDatanodes(getStoredBlock(block), containingNodes,
         containingLiveReplicasNodes, numReplicas,
         containingLiveReplicasNodes, numReplicas,
-        UnderReplicatedBlocks.LEVEL);
+        new LinkedList<Short>(), 1, UnderReplicatedBlocks.LEVEL);
     
     
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
     // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are 
     // not included in the numReplicas.liveReplicas() count
     // not included in the numReplicas.liveReplicas() count
@@ -1337,15 +1338,15 @@ public class BlockManager {
   }
   }
 
 
   /**
   /**
-   * Scan blocks in {@link #neededReplications} and assign replication
-   * work to data-nodes they belong to.
+   * Scan blocks in {@link #neededReplications} and assign recovery
+   * (replication or erasure coding) work to data-nodes they belong to.
    *
    *
    * The number of process blocks equals either twice the number of live
    * The number of process blocks equals either twice the number of live
    * data-nodes or the number of under-replicated blocks whichever is less.
    * data-nodes or the number of under-replicated blocks whichever is less.
    *
    *
    * @return number of blocks scheduled for replication during this iteration.
    * @return number of blocks scheduled for replication during this iteration.
    */
    */
-  int computeReplicationWork(int blocksToProcess) {
+  int computeBlockRecoveryWork(int blocksToProcess) {
     List<List<BlockInfo>> blocksToReplicate = null;
     List<List<BlockInfo>> blocksToReplicate = null;
     namesystem.writeLock();
     namesystem.writeLock();
     try {
     try {
@@ -1355,30 +1356,32 @@ public class BlockManager {
     } finally {
     } finally {
       namesystem.writeUnlock();
       namesystem.writeUnlock();
     }
     }
-    return computeReplicationWorkForBlocks(blocksToReplicate);
+    return computeRecoveryWorkForBlocks(blocksToReplicate);
   }
   }
 
 
-  /** Replicate a set of blocks
+  /**
+   * Recover a set of blocks to full strength through replication or
+   * erasure coding
    *
    *
-   * @param blocksToReplicate blocks to be replicated, for each priority
+   * @param blocksToRecover blocks to be recovered, for each priority
    * @return the number of blocks scheduled for replication
    * @return the number of blocks scheduled for replication
    */
    */
   @VisibleForTesting
   @VisibleForTesting
-  int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
+  int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) {
     int requiredReplication, numEffectiveReplicas;
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes;
     List<DatanodeDescriptor> containingNodes;
-    DatanodeDescriptor srcNode;
     BlockCollection bc = null;
     BlockCollection bc = null;
     int additionalReplRequired;
     int additionalReplRequired;
 
 
     int scheduledWork = 0;
     int scheduledWork = 0;
-    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+    List<BlockRecoveryWork> recovWork = new LinkedList<>();
 
 
+    // Step 1: categorize at-risk blocks into replication and EC tasks
     namesystem.writeLock();
     namesystem.writeLock();
     try {
     try {
       synchronized (neededReplications) {
       synchronized (neededReplications) {
-        for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
-          for (BlockInfo block : blocksToReplicate.get(priority)) {
+        for (int priority = 0; priority < blocksToRecover.size(); priority++) {
+          for (BlockInfo block : blocksToRecover.get(priority)) {
             // block should belong to a file
             // block should belong to a file
             bc = blocksMap.getBlockCollection(block);
             bc = blocksMap.getBlockCollection(block);
             // abandoned block or block reopened for append
             // abandoned block or block reopened for append
@@ -1392,25 +1395,31 @@ public class BlockManager {
             requiredReplication = bc.getPreferredBlockReplication();
             requiredReplication = bc.getPreferredBlockReplication();
 
 
             // get a source data-node
             // get a source data-node
-            containingNodes = new ArrayList<DatanodeDescriptor>();
-            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
+            containingNodes = new ArrayList<>();
+            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
             NumberReplicas numReplicas = new NumberReplicas();
             NumberReplicas numReplicas = new NumberReplicas();
-            srcNode = chooseSourceDatanode(
+            List<Short> missingBlockIndices = new LinkedList<>();
+            DatanodeDescriptor[] srcNodes;
+            int numSourceNodes = bc.isStriped() ?
+                HdfsConstants.NUM_DATA_BLOCKS : 1;
+            srcNodes = chooseSourceDatanodes(
                 block, containingNodes, liveReplicaNodes, numReplicas,
                 block, containingNodes, liveReplicaNodes, numReplicas,
-                priority);
-            if(srcNode == null) { // block can not be replicated from any node
-              LOG.debug("Block " + block + " cannot be repl from any node");
+                missingBlockIndices, numSourceNodes, priority);
+            if(srcNodes == null || srcNodes.length == 0) {
+              // block can not be replicated from any node
+              LOG.debug("Block " + block + " cannot be recovered " +
+                  "from any node");
               continue;
               continue;
             }
             }
 
 
-            // liveReplicaNodes can include READ_ONLY_SHARED replicas which are 
+            // liveReplicaNodes can include READ_ONLY_SHARED replicas which are
             // not included in the numReplicas.liveReplicas() count
             // not included in the numReplicas.liveReplicas() count
             assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
             assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
 
 
             // do not schedule more if enough replicas is already pending
             // do not schedule more if enough replicas is already pending
             numEffectiveReplicas = numReplicas.liveReplicas() +
             numEffectiveReplicas = numReplicas.liveReplicas() +
                                     pendingReplications.getNumReplicas(block);
                                     pendingReplications.getNumReplicas(block);
-      
+
             if (numEffectiveReplicas >= requiredReplication) {
             if (numEffectiveReplicas >= requiredReplication) {
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
                    (blockHasEnoughRacks(block)) ) {
                    (blockHasEnoughRacks(block)) ) {
@@ -1427,9 +1436,21 @@ public class BlockManager {
             } else {
             } else {
               additionalReplRequired = 1; // Needed on a new rack
               additionalReplRequired = 1; // Needed on a new rack
             }
             }
-            work.add(new ReplicationWork(block, bc, srcNode,
-                containingNodes, liveReplicaNodes, additionalReplRequired,
-                priority));
+            if (bc.isStriped()) {
+              ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes,
+                  containingNodes, liveReplicaNodes, additionalReplRequired,
+                  priority);
+              short[] missingBlockArray = new short[missingBlockIndices.size()];
+              for (int i = 0 ; i < missingBlockIndices.size(); i++) {
+                missingBlockArray[i] = missingBlockIndices.get(i);
+              }
+              ecw.setMissingBlockIndices(missingBlockArray);
+              recovWork.add(ecw);
+            } else {
+              recovWork.add(new ReplicationWork(block, bc, srcNodes,
+                  containingNodes, liveReplicaNodes, additionalReplRequired,
+                  priority));
+            }
           }
           }
         }
         }
       }
       }
@@ -1437,8 +1458,9 @@ public class BlockManager {
       namesystem.writeUnlock();
       namesystem.writeUnlock();
     }
     }
 
 
+    // Step 2: choose target nodes for each recovery task
     final Set<Node> excludedNodes = new HashSet<Node>();
     final Set<Node> excludedNodes = new HashSet<Node>();
-    for(ReplicationWork rw : work){
+    for(BlockRecoveryWork rw : recovWork){
       // Exclude all of the containing nodes from being targets.
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
       // This list includes decommissioning or corrupt nodes.
       excludedNodes.clear();
       excludedNodes.clear();
@@ -1452,9 +1474,10 @@ public class BlockManager {
       rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
       rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
     }
     }
 
 
+    // Step 3: add tasks to the DN
     namesystem.writeLock();
     namesystem.writeLock();
     try {
     try {
-      for(ReplicationWork rw : work){
+      for(BlockRecoveryWork rw : recovWork){
         final DatanodeStorageInfo[] targets = rw.targets;
         final DatanodeStorageInfo[] targets = rw.targets;
         if(targets == null || targets.length == 0){
         if(targets == null || targets.length == 0){
           rw.targets = null;
           rw.targets = null;
@@ -1493,7 +1516,7 @@ public class BlockManager {
 
 
           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
                (!blockHasEnoughRacks(block)) ) {
                (!blockHasEnoughRacks(block)) ) {
-            if (rw.srcNode.getNetworkLocation().equals(
+            if (rw.srcNodes[0].getNetworkLocation().equals(
                 targets[0].getDatanodeDescriptor().getNetworkLocation())) {
                 targets[0].getDatanodeDescriptor().getNetworkLocation())) {
               //No use continuing, unless a new rack in this case
               //No use continuing, unless a new rack in this case
               continue;
               continue;
@@ -1501,7 +1524,17 @@ public class BlockManager {
           }
           }
 
 
           // Add block to the to be replicated list
           // Add block to the to be replicated list
-          rw.srcNode.addBlockToBeReplicated(block, targets);
+          if (bc.isStriped()) {
+            assert rw instanceof ErasureCodingWork;
+            assert rw.targets.length > 0;
+            rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+                new ExtendedBlock(namesystem.getBlockPoolId(), block),
+                rw.srcNodes, rw.targets,
+                ((ErasureCodingWork)rw).getMissingBlockIndicies());
+          }
+          else {
+            rw.srcNodes[0].addBlockToBeReplicated(block, targets);
+          }
           scheduledWork++;
           scheduledWork++;
           DatanodeStorageInfo.incrementBlocksScheduled(targets);
           DatanodeStorageInfo.incrementBlocksScheduled(targets);
 
 
@@ -1525,7 +1558,7 @@ public class BlockManager {
 
 
     if (blockLog.isInfoEnabled()) {
     if (blockLog.isInfoEnabled()) {
       // log which blocks have been scheduled for replication
       // log which blocks have been scheduled for replication
-      for(ReplicationWork rw : work){
+      for(BlockRecoveryWork rw : recovWork){
         DatanodeStorageInfo[] targets = rw.targets;
         DatanodeStorageInfo[] targets = rw.targets;
         if (targets != null && targets.length != 0) {
         if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
           StringBuilder targetList = new StringBuilder("datanode(s)");
@@ -1533,7 +1566,7 @@ public class BlockManager {
             targetList.append(' ');
             targetList.append(' ');
             targetList.append(targets[k].getDatanodeDescriptor());
             targetList.append(targets[k].getDatanodeDescriptor());
           }
           }
-          blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNode,
+          blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes,
               rw.block, targetList);
               rw.block, targetList);
         }
         }
       }
       }
@@ -1619,55 +1652,66 @@ public class BlockManager {
   }
   }
 
 
   /**
   /**
-   * Parse the data-nodes the block belongs to and choose one,
-   * which will be the replication source.
+   * Parse the data-nodes the block belongs to and choose a certain number
+   * from them to be the recovery sources.
    *
    *
    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
    * since the former do not have write traffic and hence are less busy.
    * since the former do not have write traffic and hence are less busy.
    * We do not use already decommissioned nodes as a source.
    * We do not use already decommissioned nodes as a source.
-   * Otherwise we choose a random node among those that did not reach their
-   * replication limits.  However, if the replication is of the highest priority
-   * and all nodes have reached their replication limits, we will choose a
-   * random node despite the replication limit.
+   * Otherwise we randomly choose nodes among those that did not reach their
+   * replication limits. However, if the recovery work is of the highest
+   * priority and all nodes have reached their replication limits, we will
+   * randomly choose the desired number of nodes despite the replication limit.
    *
    *
    * In addition form a list of all nodes containing the block
    * In addition form a list of all nodes containing the block
    * and calculate its replication numbers.
    * and calculate its replication numbers.
    *
    *
    * @param block Block for which a replication source is needed
    * @param block Block for which a replication source is needed
-   * @param containingNodes List to be populated with nodes found to contain the 
-   *                        given block
-   * @param nodesContainingLiveReplicas List to be populated with nodes found to
-   *                                    contain live replicas of the given block
-   * @param numReplicas NumberReplicas instance to be initialized with the 
-   *                                   counts of live, corrupt, excess, and
-   *                                   decommissioned replicas of the given
-   *                                   block.
+   * @param containingNodes List to be populated with nodes found to contain
+   *                        the given block
+   * @param nodesContainingLiveReplicas List to be populated with nodes found
+   *                                    to contain live replicas of the given
+   *                                    block
+   * @param numReplicas NumberReplicas instance to be initialized with the
+   *                    counts of live, corrupt, excess, and decommissioned
+   *                    replicas of the given block.
+   * @param missingBlockIndices List to be populated with indices of missing
+   *                            blocks in a striped block group or missing
+   *                            replicas of a replicated block
+   * @param numSourceNodes integer specifying the number of source nodes to
+   *                       choose
    * @param priority integer representing replication priority of the given
    * @param priority integer representing replication priority of the given
    *                 block
    *                 block
-   * @return the DatanodeDescriptor of the chosen node from which to replicate
-   *         the given block
-   */
-   @VisibleForTesting
-   DatanodeDescriptor chooseSourceDatanode(Block block,
-       List<DatanodeDescriptor> containingNodes,
-       List<DatanodeStorageInfo>  nodesContainingLiveReplicas,
-       NumberReplicas numReplicas,
-       int priority) {
+   * @return the array of DatanodeDescriptor of the chosen nodes from which to
+   *         recover the given block
+   */
+  @VisibleForTesting
+  DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
+      List<DatanodeDescriptor> containingNodes,
+      List<DatanodeStorageInfo> nodesContainingLiveReplicas,
+      NumberReplicas numReplicas,
+      List<Short> missingBlockIndices, int numSourceNodes, int priority) {
     containingNodes.clear();
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     nodesContainingLiveReplicas.clear();
-    DatanodeDescriptor srcNode = null;
+    LinkedList<DatanodeDescriptor> srcNodes = new LinkedList<>();
     int live = 0;
     int live = 0;
     int decommissioned = 0;
     int decommissioned = 0;
     int decommissioning = 0;
     int decommissioning = 0;
     int corrupt = 0;
     int corrupt = 0;
     int excess = 0;
     int excess = 0;
-    
+    missingBlockIndices.clear();
+    Set<Short> healthyIndices = new HashSet<>();
+
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
+      if (block.isStriped()) {
+        healthyIndices.add((short) ((BlockInfoStriped) block).
+            getStorageBlockIndex(storage));
+      }
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       LightWeightLinkedSet<Block> excessBlocks =
       LightWeightLinkedSet<Block> excessBlocks =
         excessReplicateMap.get(node.getDatanodeUuid());
         excessReplicateMap.get(node.getDatanodeUuid());
-      int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; 
+      int countableReplica = storage.getState() == State.NORMAL ? 1 : 0;
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt += countableReplica;
         corrupt += countableReplica;
       else if (node.isDecommissionInProgress()) {
       else if (node.isDecommissionInProgress()) {
@@ -1703,20 +1747,32 @@ public class BlockManager {
         continue;
         continue;
 
 
       // We got this far, current node is a reasonable choice
       // We got this far, current node is a reasonable choice
-      if (srcNode == null) {
-        srcNode = node;
+      if(srcNodes.size() < numSourceNodes) {
+        srcNodes.add(node);
         continue;
         continue;
       }
       }
       // switch to a different node randomly
       // switch to a different node randomly
       // this to prevent from deterministically selecting the same node even
       // this to prevent from deterministically selecting the same node even
       // if the node failed to replicate the block on previous iterations
       // if the node failed to replicate the block on previous iterations
-      if(ThreadLocalRandom.current().nextBoolean())
-        srcNode = node;
+      if(ThreadLocalRandom.current().nextBoolean()) {
+        int pos = ThreadLocalRandom.current().nextInt(numSourceNodes);
+        if(!srcNodes.get(pos).isDecommissionInProgress()) {
+          srcNodes.set(pos, node);
+        }
+      }
+    }
+    if (block.isStriped()) {
+      for (short i = 0; i < HdfsConstants.NUM_DATA_BLOCKS +
+          HdfsConstants.NUM_PARITY_BLOCKS; i++) {
+        if (!healthyIndices.contains(i)) {
+          missingBlockIndices.add(i);
+        }
+      }
     }
     }
     if(numReplicas != null)
     if(numReplicas != null)
       numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
       numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
           excess, 0);
           excess, 0);
-    return srcNode;
+    return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
   }
   }
 
 
   /**
   /**
@@ -1751,7 +1807,7 @@ public class BlockManager {
        */
        */
     }
     }
   }
   }
-  
+
   /**
   /**
    * StatefulBlockInfo is used to build the "toUC" list, which is a list of
    * StatefulBlockInfo is used to build the "toUC" list, which is a list of
    * updates to the information about under-construction blocks.
    * updates to the information about under-construction blocks.
@@ -3716,7 +3772,7 @@ public class BlockManager {
   }
   }
 
 
   /**
   /**
-   * Periodically calls computeReplicationWork().
+   * Periodically calls computeBlockRecoveryWork().
    */
    */
   private class ReplicationMonitor implements Runnable {
   private class ReplicationMonitor implements Runnable {
 
 
@@ -3774,7 +3830,7 @@ public class BlockManager {
     final int nodesToProcess = (int) Math.ceil(numlive
     final int nodesToProcess = (int) Math.ceil(numlive
         * this.blocksInvalidateWorkPct);
         * this.blocksInvalidateWorkPct);
 
 
-    int workFound = this.computeReplicationWork(blocksToProcess);
+    int workFound = this.computeBlockRecoveryWork(blocksToProcess);
 
 
     // Update counters
     // Update counters
     namesystem.writeLock();
     namesystem.writeLock();
@@ -3814,47 +3870,117 @@ public class BlockManager {
         null);
         null);
   }
   }
 
 
-  private static class ReplicationWork {
-
-    private final BlockInfo block;
-    private final BlockCollection bc;
+  /**
+   * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
+   * to represent a task to recover a block through replication or erasure
+   * coding. Recovery is done by transferring data from {@link srcNodes} to
+   * {@link targets}
+   */
+  private static class BlockRecoveryWork {
+    protected final BlockInfo block;
+    protected final BlockCollection bc;
 
 
-    private final DatanodeDescriptor srcNode;
-    private final List<DatanodeDescriptor> containingNodes;
-    private final List<DatanodeStorageInfo> liveReplicaStorages;
-    private final int additionalReplRequired;
+    /**
+     * An erasure coding recovery task has multiple source nodes.
+     * A replication task only has 1 source node, stored on top of the array
+     */
+    protected final DatanodeDescriptor[] srcNodes;
+    /** Nodes containing the block; avoid them in choosing new targets */
+    protected final List<DatanodeDescriptor> containingNodes;
+    /** Required by {@link BlockPlacementPolicy#chooseTarget} */
+    protected final List<DatanodeStorageInfo> liveReplicaStorages;
+    protected final int additionalReplRequired;
 
 
-    private DatanodeStorageInfo targets[];
-    private final int priority;
+    protected DatanodeStorageInfo[] targets;
+    protected final int priority;
 
 
-    public ReplicationWork(BlockInfo block,
+    public BlockRecoveryWork(BlockInfo block,
         BlockCollection bc,
         BlockCollection bc,
-        DatanodeDescriptor srcNode,
+        DatanodeDescriptor[] srcNodes,
         List<DatanodeDescriptor> containingNodes,
         List<DatanodeDescriptor> containingNodes,
         List<DatanodeStorageInfo> liveReplicaStorages,
         List<DatanodeStorageInfo> liveReplicaStorages,
         int additionalReplRequired,
         int additionalReplRequired,
         int priority) {
         int priority) {
       this.block = block;
       this.block = block;
       this.bc = bc;
       this.bc = bc;
-      this.srcNode = srcNode;
-      this.srcNode.incrementPendingReplicationWithoutTargets();
+      this.srcNodes = srcNodes;
       this.containingNodes = containingNodes;
       this.containingNodes = containingNodes;
       this.liveReplicaStorages = liveReplicaStorages;
       this.liveReplicaStorages = liveReplicaStorages;
       this.additionalReplRequired = additionalReplRequired;
       this.additionalReplRequired = additionalReplRequired;
       this.priority = priority;
       this.priority = priority;
       this.targets = null;
       this.targets = null;
     }
     }
-    
-    private void chooseTargets(BlockPlacementPolicy blockplacement,
+
+    protected void chooseTargets(BlockPlacementPolicy blockplacement,
+        BlockStoragePolicySuite storagePolicySuite,
+        Set<Node> excludedNodes) {
+    }
+  }
+
+  private static class ReplicationWork extends BlockRecoveryWork {
+
+    public ReplicationWork(BlockInfo block,
+        BlockCollection bc,
+        DatanodeDescriptor[] srcNodes,
+        List<DatanodeDescriptor> containingNodes,
+        List<DatanodeStorageInfo> liveReplicaStorages,
+        int additionalReplRequired,
+        int priority) {
+      super(block, bc, srcNodes, containingNodes,
+          liveReplicaStorages, additionalReplRequired, priority);
+      LOG.debug("Creating a ReplicationWork to recover " + block);
+    }
+
+    protected void chooseTargets(BlockPlacementPolicy blockplacement,
+        BlockStoragePolicySuite storagePolicySuite,
+        Set<Node> excludedNodes) {
+      assert srcNodes.length > 0
+          : "At least 1 source node should have been selected";
+      try {
+        targets = blockplacement.chooseTarget(bc.getName(),
+            additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
+            excludedNodes, block.getNumBytes(),
+            storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
+      } finally {
+        srcNodes[0].decrementPendingReplicationWithoutTargets();
+      }
+    }
+  }
+
+  private static class ErasureCodingWork extends BlockRecoveryWork {
+
+    private short[] missingBlockIndicies = null;
+
+    public ErasureCodingWork(BlockInfo block,
+        BlockCollection bc,
+        DatanodeDescriptor[] srcNodes,
+        List<DatanodeDescriptor> containingNodes,
+        List<DatanodeStorageInfo> liveReplicaStorages,
+        int additionalReplRequired,
+        int priority) {
+      super(block, bc, srcNodes, containingNodes,
+          liveReplicaStorages, additionalReplRequired, priority);
+      LOG.debug("Creating an ErasureCodingWork to recover " + block);
+    }
+
+    public short[] getMissingBlockIndicies() {
+      return missingBlockIndicies;
+    }
+
+    public void setMissingBlockIndices(short[] missingBlockIndicies) {
+      this.missingBlockIndicies = missingBlockIndicies;
+    }
+
+    protected void chooseTargets(BlockPlacementPolicy blockplacement,
         BlockStoragePolicySuite storagePolicySuite,
         BlockStoragePolicySuite storagePolicySuite,
         Set<Node> excludedNodes) {
         Set<Node> excludedNodes) {
       try {
       try {
+        // TODO: new placement policy for EC considering multiple writers
         targets = blockplacement.chooseTarget(bc.getName(),
         targets = blockplacement.chooseTarget(bc.getName(),
-            additionalReplRequired, srcNode, liveReplicaStorages, false,
+            additionalReplRequired, srcNodes[0], liveReplicaStorages, false,
             excludedNodes, block.getNumBytes(),
             excludedNodes, block.getNumBytes(),
             storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
             storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
       } finally {
       } finally {
-        srcNode.decrementPendingReplicationWithoutTargets();
       }
       }
     }
     }
   }
   }

+ 68 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.Set;
+import java.util.Arrays;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
 
 
@@ -41,6 +42,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -97,6 +99,33 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
     }
   }
   }
 
 
+  /** Block and targets pair */
+  @InterfaceAudience.Private
+  @InterfaceStability.Evolving
+  public static class BlockECRecoveryInfo {
+    public final ExtendedBlock block;
+    public final DatanodeDescriptor[] sources;
+    public final DatanodeStorageInfo[] targets;
+    public final short[] missingBlockIndices;
+
+    BlockECRecoveryInfo(ExtendedBlock block, DatanodeDescriptor[] sources,
+        DatanodeStorageInfo[] targets, short[] missingBlockIndices) {
+      this.block = block;
+      this.sources = sources;
+      this.targets = targets;
+      this.missingBlockIndices = missingBlockIndices;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("BlockECRecoveryInfo(\n  ").
+          append("Recovering ").append(block).
+          append(" From: ").append(Arrays.asList(sources)).
+          append(" To: ").append(Arrays.asList(targets)).append(")\n").
+          toString();
+    }
+  }
+
   /** A BlockTargetPair queue. */
   /** A BlockTargetPair queue. */
   private static class BlockQueue<E> {
   private static class BlockQueue<E> {
     private final Queue<E> blockq = new LinkedList<E>();
     private final Queue<E> blockq = new LinkedList<E>();
@@ -217,12 +246,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private long bandwidth;
   private long bandwidth;
 
 
   /** A queue of blocks to be replicated by this datanode */
   /** A queue of blocks to be replicated by this datanode */
-  private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
+  private final BlockQueue<BlockTargetPair> replicateBlocks =
+      new BlockQueue<>();
+  /** A queue of blocks to be erasure coded by this datanode */
+  private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
+      new BlockQueue<>();
   /** A queue of blocks to be recovered by this datanode */
   /** A queue of blocks to be recovered by this datanode */
-  private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks =
-                                new BlockQueue<BlockInfoContiguousUnderConstruction>();
+  private final BlockQueue<BlockInfoContiguousUnderConstruction>
+      recoverBlocks = new BlockQueue<>();
   /** A set of blocks to be invalidated by this datanode */
   /** A set of blocks to be invalidated by this datanode */
-  private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
+  private final LightWeightHashSet<Block> invalidateBlocks =
+      new LightWeightHashSet<>();
 
 
   /* Variables for maintaining number of blocks scheduled to be written to
   /* Variables for maintaining number of blocks scheduled to be written to
    * this storage. This count is approximate and might be slightly bigger
    * this storage. This count is approximate and might be slightly bigger
@@ -375,6 +409,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       this.invalidateBlocks.clear();
       this.invalidateBlocks.clear();
       this.recoverBlocks.clear();
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
       this.replicateBlocks.clear();
+      this.erasurecodeBlocks.clear();
     }
     }
     // pendingCached, cached, and pendingUncached are protected by the
     // pendingCached, cached, and pendingUncached are protected by the
     // FSN lock.
     // FSN lock.
@@ -596,6 +631,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
     replicateBlocks.offer(new BlockTargetPair(block, targets));
     replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
   }
 
 
+  /**
+   * Store block erasure coding work.
+   */
+  void addBlockToBeErasureCoded(ExtendedBlock block, DatanodeDescriptor[] sources,
+      DatanodeStorageInfo[] targets, short[] missingBlockIndicies) {
+    assert(block != null && sources != null && sources.length > 0);
+    BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
+        missingBlockIndicies);
+    erasurecodeBlocks.offer(task);
+    BlockManager.LOG.debug("Adding block recovery task " + task +
+        "to " + getName() + ", current queue size is " +
+        erasurecodeBlocks.size());
+  }
+
   /**
   /**
    * Store block recovery work.
    * Store block recovery work.
    */
    */
@@ -627,6 +676,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return PendingReplicationWithoutTargets + replicateBlocks.size();
     return PendingReplicationWithoutTargets + replicateBlocks.size();
   }
   }
 
 
+  /**
+   * The number of work items that are pending to be replicated
+   */
+  int getNumberOfBlocksToBeErasureCoded() {
+    return erasurecodeBlocks.size();
+  }
+
   /**
   /**
    * The number of block invalidation items that are pending to 
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    * be sent to the datanode
@@ -641,6 +697,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return replicateBlocks.poll(maxTransfers);
     return replicateBlocks.poll(maxTransfers);
   }
   }
 
 
+  public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
+    return erasurecodeBlocks.poll(maxTransfers);
+  }
+
   public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
   public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
     if(blocks == null)
@@ -841,6 +901,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
     if (repl > 0) {
     if (repl > 0) {
       sb.append(" ").append(repl).append(" blocks to be replicated;");
       sb.append(" ").append(repl).append(" blocks to be replicated;");
     }
     }
+    int ec = erasurecodeBlocks.size();
+    if(ec > 0) {
+      sb.append(" ").append(ec).append(" blocks to be erasure coded;");
+    }
     int inval = invalidateBlocks.size();
     int inval = invalidateBlocks.size();
     if (inval > 0) {
     if (inval > 0) {
       sb.append(" ").append(inval).append(" blocks to be invalidated;");      
       sb.append(" ").append(inval).append(" blocks to be invalidated;");      

+ 14 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -1349,7 +1350,7 @@ public class DatanodeManager {
       VolumeFailureSummary volumeFailureSummary) throws IOException {
       VolumeFailureSummary volumeFailureSummary) throws IOException {
     synchronized (heartbeatManager) {
     synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
       synchronized (datanodeMap) {
-        DatanodeDescriptor nodeinfo = null;
+        DatanodeDescriptor nodeinfo;
         try {
         try {
           nodeinfo = getDatanode(nodeReg);
           nodeinfo = getDatanode(nodeReg);
         } catch(UnregisteredNodeException e) {
         } catch(UnregisteredNodeException e) {
@@ -1387,10 +1388,10 @@ public class DatanodeManager {
             final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
             final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
             // Skip stale nodes during recovery - not heart beated for some time (30s by default).
             // Skip stale nodes during recovery - not heart beated for some time (30s by default).
             final List<DatanodeStorageInfo> recoveryLocations =
             final List<DatanodeStorageInfo> recoveryLocations =
-                new ArrayList<DatanodeStorageInfo>(storages.length);
-            for (int i = 0; i < storages.length; i++) {
-              if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
-                recoveryLocations.add(storages[i]);
+                new ArrayList<>(storages.length);
+            for (DatanodeStorageInfo storage : storages) {
+              if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
+                recoveryLocations.add(storage);
               }
               }
             }
             }
             // If we are performing a truncate recovery than set recovery fields
             // If we are performing a truncate recovery than set recovery fields
@@ -1429,7 +1430,7 @@ public class DatanodeManager {
           return new DatanodeCommand[] { brCommand };
           return new DatanodeCommand[] { brCommand };
         }
         }
 
 
-        final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
+        final List<DatanodeCommand> cmds = new ArrayList<>();
         //check pending replication
         //check pending replication
         List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
         List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
               maxTransfers);
               maxTransfers);
@@ -1437,6 +1438,13 @@ public class DatanodeManager {
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
               pendingList));
               pendingList));
         }
         }
+        // checking pending erasure coding tasks
+        List<BlockECRecoveryInfo> pendingECList =
+            nodeinfo.getErasureCodeCommand(maxTransfers);
+        if (pendingECList != null) {
+          cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_CODEC,
+              pendingECList));
+        }
         //check block invalidation
         //check block invalidation
         Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
         Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
         if (blks != null) {
         if (blks != null) {

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -419,7 +419,8 @@ public class INodeFile extends INodeWithAdditionalFields
       }
       }
       max = maxInSnapshot > max ? maxInSnapshot : max;
       max = maxInSnapshot > max ? maxInSnapshot : max;
     }
     }
-    return max;
+    return isStriped()?
+        HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max;
   }
   }
 
 
   /** Set the replication factor of this file. */
   /** Set the replication factor of this file. */
@@ -1107,11 +1108,12 @@ public class INodeFile extends INodeWithAdditionalFields
         Arrays.asList(snapshotBlocks).contains(block);
         Arrays.asList(snapshotBlocks).contains(block);
   }
   }
 
 
-  @VisibleForTesting
   /**
   /**
    * @return true if the file is in the striping layout.
    * @return true if the file is in the striping layout.
    */
    */
-  // TODO: move erasure coding policy to file XAttr (HDFS-7337)
+  @VisibleForTesting
+  @Override
+  // TODO: move erasure coding policy to file XAttr
   public boolean isStriped() {
   public boolean isStriped() {
     return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID;
     return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID;
   }
   }

+ 63 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java

@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.base.Joiner;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
+
+import java.util.Collection;
+
+/**
+ * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a
+ * striped block group with missing blocks.
+ *
+ * Upon receiving this command, the DataNode pulls data from other DataNodes
+ * hosting blocks in this group and reconstructs the lost blocks through codec
+ * calculation.
+ *
+ * After the reconstruction, the DataNode pushes the reconstructed blocks to
+ * their final destinations if necessary (e.g., the destination is different
+ * from the reconstruction node, or multiple blocks in a group are to be
+ * reconstructed).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockECRecoveryCommand extends DatanodeCommand {
+  final Collection<BlockECRecoveryInfo> ecTasks;
+
+  /**
+   * Create BlockECRecoveryCommand from a collection of
+   * {@link BlockECRecoveryInfo}, each representing a recovery task
+   */
+  public BlockECRecoveryCommand(int action,
+      Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) {
+    super(action);
+    this.ecTasks = blockECRecoveryInfoList;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("BlockECRecoveryCommand(\n  ");
+    Joiner.on("\n  ").appendTo(sb, ecTasks);
+    sb.append("\n)");
+    return sb.toString();
+  }
+}

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

@@ -76,6 +76,7 @@ public interface DatanodeProtocol {
   final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
   final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
   final static int DNA_CACHE = 9;      // cache blocks
   final static int DNA_CACHE = 9;      // cache blocks
   final static int DNA_UNCACHE = 10;   // uncache blocks
   final static int DNA_UNCACHE = 10;   // uncache blocks
+  final static int DNA_CODEC = 11;   // uncache blocks
 
 
   /** 
   /** 
    * Register Datanode.
    * Register Datanode.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java

@@ -161,7 +161,7 @@ public class BlockManagerTestUtil {
    */
    */
   public static int computeAllPendingWork(BlockManager bm) {
   public static int computeAllPendingWork(BlockManager bm) {
     int work = computeInvalidationWork(bm);
     int work = computeInvalidationWork(bm);
-    work += bm.computeReplicationWork(Integer.MAX_VALUE);
+    work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE);
     return work;
     return work;
   }
   }
 
 

+ 11 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -453,8 +453,8 @@ public class TestBlockManager {
     assertEquals("Block not initially pending replication", 0,
     assertEquals("Block not initially pending replication", 0,
         bm.pendingReplications.getNumReplicas(block));
         bm.pendingReplications.getNumReplicas(block));
     assertEquals(
     assertEquals(
-        "computeReplicationWork should indicate replication is needed", 1,
-        bm.computeReplicationWorkForBlocks(list_all));
+        "computeBlockRecoveryWork should indicate replication is needed", 1,
+        bm.computeRecoveryWorkForBlocks(list_all));
     assertTrue("replication is pending after work is computed",
     assertTrue("replication is pending after work is computed",
         bm.pendingReplications.getNumReplicas(block) > 0);
         bm.pendingReplications.getNumReplicas(block) > 0);
 
 
@@ -508,22 +508,22 @@ public class TestBlockManager {
     assertNotNull("Chooses source node for a highest-priority replication"
     assertNotNull("Chooses source node for a highest-priority replication"
         + " even if all available source nodes have reached their replication"
         + " even if all available source nodes have reached their replication"
         + " limits below the hard limit.",
         + " limits below the hard limit.",
-        bm.chooseSourceDatanode(
-            aBlock,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             cntNodes,
             liveNodes,
             liveNodes,
             new NumberReplicas(),
             new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+            new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
 
 
     assertNull("Does not choose a source node for a less-than-highest-priority"
     assertNull("Does not choose a source node for a less-than-highest-priority"
         + " replication since all available source nodes have reached"
         + " replication since all available source nodes have reached"
         + " their replication limits.",
         + " their replication limits.",
-        bm.chooseSourceDatanode(
-            aBlock,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             cntNodes,
             liveNodes,
             liveNodes,
             new NumberReplicas(),
             new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
+            new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)[0]);
 
 
     // Increase the replication count to test replication count > hard limit
     // Increase the replication count to test replication count > hard limit
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
@@ -531,12 +531,12 @@ public class TestBlockManager {
 
 
     assertNull("Does not choose a source node for a highest-priority"
     assertNull("Does not choose a source node for a highest-priority"
         + " replication when all available nodes exceed the hard limit.",
         + " replication when all available nodes exceed the hard limit.",
-        bm.chooseSourceDatanode(
-            aBlock,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             cntNodes,
             liveNodes,
             liveNodes,
             new NumberReplicas(),
             new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+            new LinkedList<Short>(), 1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
   }
   }
 
 
   @Test
   @Test

+ 107 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRecoverStripedBlocks.java

@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.EC_STORAGE_POLICY_NAME;
+import static org.junit.Assert.assertTrue;
+
+public class TestRecoverStripedBlocks {
+  private final short GROUP_SIZE =
+      HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final short NUM_OF_DATANODES = GROUP_SIZE + 1;
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private DistributedFileSystem fs;
+  private static final int BLOCK_SIZE = 1024;
+  private HdfsAdmin dfsAdmin;
+  private FSNamesystem namesystem;
+  private Path ECFilePath;
+
+  @Before
+  public void setupCluster() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    // Large value to make sure the pending replication request can stay in
+    // DatanodeDescriptor.replicateBlocks before test timeout.
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100);
+    // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via
+    // chooseUnderReplicatedBlocks at once.
+    conf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5);
+
+    cluster = new MiniDFSCluster.Builder(conf).
+        numDataNodes(NUM_OF_DATANODES).build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+    dfsAdmin = new HdfsAdmin(cluster.getURI(), conf);
+    namesystem = cluster.getNamesystem();
+    ECFilePath = new Path("/ecfile");
+    DFSTestUtil.createFile(fs, ECFilePath, 4 * BLOCK_SIZE, GROUP_SIZE, 0);
+    dfsAdmin.setStoragePolicy(ECFilePath, EC_STORAGE_POLICY_NAME);
+  }
+
+  @Test
+  public void testMissingStripedBlock() throws Exception {
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, ECFilePath);
+    Iterator<DatanodeStorageInfo> storageInfos =
+        bm.blocksMap.getStorages(b.getLocalBlock())
+            .iterator();
+
+    DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor();
+    Iterator<BlockInfo> it = firstDn.getBlockIterator();
+    int missingBlkCnt = 0;
+    while (it.hasNext()) {
+      BlockInfo blk = it.next();
+      BlockManager.LOG.debug("Block " + blk + " will be lost");
+      missingBlkCnt++;
+    }
+    BlockManager.LOG.debug("Missing in total " + missingBlkCnt + " blocks");
+
+    bm.getDatanodeManager().removeDatanode(firstDn);
+
+    bm.computeDatanodeWork();
+
+    short cnt = 0;
+    for (DataNode dn : cluster.getDataNodes()) {
+      DatanodeDescriptor dnDescriptor =
+          bm.getDatanodeManager().getDatanode(dn.getDatanodeUuid());
+      cnt += dnDescriptor.getNumberOfBlocksToBeErasureCoded();
+    }
+
+    assertTrue("Counting the number of outstanding EC tasks", cnt == missingBlkCnt);
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -1162,7 +1162,7 @@ public class TestReplicationPolicy {
     when(mockNS.hasWriteLock()).thenReturn(true);
     when(mockNS.hasWriteLock()).thenReturn(true);
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
     BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
     UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
-    
+
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
     BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
     BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
     BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());