Browse Source

HDFS-8328. Follow-on to update decode for DataNode striped blocks reconstruction. (yliu)

yliu 10 years ago
parent
commit
a31eada33a

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

@@ -280,3 +280,6 @@
 
 
     HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic.
     HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic.
     (Walter Su via zhz)
     (Walter Su via zhz)
+
+    HDFS-8328. Follow-on to update decode for DataNode striped blocks
+    reconstruction. (yliu)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -373,7 +373,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
   public static final String  DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads";
   public static final int     DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
   public static final int     DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20;
   public static final String  DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
   public static final String  DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
-  public static final int     DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 256 * 1024;
+  public static final int     DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024;
   public static final String  DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
   public static final String  DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
   public static final int     DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s 
   public static final int     DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s 
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";

+ 247 - 174
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java

@@ -70,9 +70,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
-import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Daemon;
@@ -80,6 +78,8 @@ import org.apache.hadoop.util.DataChecksum;
 
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
 
 
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.convertIndex4Decode;
+
 /**
 /**
  * ErasureCodingWorker handles the erasure coding recovery work commands. These
  * ErasureCodingWorker handles the erasure coding recovery work commands. These
  * commands would be issued from Namenode as part of Datanode's heart beat
  * commands would be issued from Namenode as part of Datanode's heart beat
@@ -110,10 +110,6 @@ public final class ErasureCodingWorker {
         DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
         DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
         DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
         DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
   }
   }
-
-  private RawErasureEncoder newEncoder(int numDataUnits, int numParityUnits) {
-    return new RSRawEncoder(numDataUnits, numParityUnits);
-  }
   
   
   private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
   private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
     return new RSRawDecoder(numDataUnits, numParityUnits);
     return new RSRawDecoder(numDataUnits, numParityUnits);
@@ -221,14 +217,14 @@ public final class ErasureCodingWorker {
     private final int parityBlkNum;
     private final int parityBlkNum;
     private final int cellSize;
     private final int cellSize;
     
     
-    private RawErasureEncoder encoder;
     private RawErasureDecoder decoder;
     private RawErasureDecoder decoder;
 
 
     // Striped read buffer size
     // Striped read buffer size
     private int bufferSize;
     private int bufferSize;
 
 
     private final ExtendedBlock blockGroup;
     private final ExtendedBlock blockGroup;
-    // position in striped block
+    private final int minRequiredSources;
+    // position in striped internal block
     private long positionInBlock;
     private long positionInBlock;
 
 
     // sources
     // sources
@@ -237,6 +233,10 @@ public final class ErasureCodingWorker {
 
 
     private final List<StripedReader> stripedReaders;
     private final List<StripedReader> stripedReaders;
 
 
+    // The buffers and indices for striped blocks whose length is 0
+    private ByteBuffer[] zeroStripeBuffers;
+    private short[] zeroStripeIndices;
+
     // targets
     // targets
     private final DatanodeInfo[] targets;
     private final DatanodeInfo[] targets;
     private final StorageType[] targetStorageTypes;
     private final StorageType[] targetStorageTypes;
@@ -272,21 +272,32 @@ public final class ErasureCodingWorker {
       cellSize = recoveryInfo.getCellSize();
       cellSize = recoveryInfo.getCellSize();
 
 
       blockGroup = recoveryInfo.getExtendedBlock();
       blockGroup = recoveryInfo.getExtendedBlock();
+      final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1);
+      minRequiredSources = Math.min(cellsNum, dataBlkNum);
 
 
       liveIndices = recoveryInfo.getLiveBlockIndices();
       liveIndices = recoveryInfo.getLiveBlockIndices();
       sources = recoveryInfo.getSourceDnInfos();
       sources = recoveryInfo.getSourceDnInfos();
       stripedReaders = new ArrayList<>(sources.length);
       stripedReaders = new ArrayList<>(sources.length);
 
 
-      Preconditions.checkArgument(liveIndices.length >= dataBlkNum,
+      Preconditions.checkArgument(liveIndices.length >= minRequiredSources,
           "No enough live striped blocks.");
           "No enough live striped blocks.");
       Preconditions.checkArgument(liveIndices.length == sources.length,
       Preconditions.checkArgument(liveIndices.length == sources.length,
           "liveBlockIndices and source dns should match");
           "liveBlockIndices and source dns should match");
 
 
+      if (minRequiredSources < dataBlkNum) {
+        zeroStripeBuffers = 
+            new ByteBuffer[dataBlkNum - minRequiredSources];
+        zeroStripeIndices = new short[dataBlkNum - minRequiredSources];
+      }
+
       targets = recoveryInfo.getTargetDnInfos();
       targets = recoveryInfo.getTargetDnInfos();
       targetStorageTypes = recoveryInfo.getTargetStorageTypes();
       targetStorageTypes = recoveryInfo.getTargetStorageTypes();
       targetIndices = new short[targets.length];
       targetIndices = new short[targets.length];
       targetBuffers = new ByteBuffer[targets.length];
       targetBuffers = new ByteBuffer[targets.length];
 
 
+      Preconditions.checkArgument(targetIndices.length <= parityBlkNum,
+          "Too much missed striped blocks.");
+
       targetSockets = new Socket[targets.length];
       targetSockets = new Socket[targets.length];
       targetOutputStreams = new DataOutputStream[targets.length];
       targetOutputStreams = new DataOutputStream[targets.length];
       targetInputStreams = new DataInputStream[targets.length];
       targetInputStreams = new DataInputStream[targets.length];
@@ -303,6 +314,10 @@ public final class ErasureCodingWorker {
       cachingStrategy = CachingStrategy.newDefaultStrategy();
       cachingStrategy = CachingStrategy.newDefaultStrategy();
     }
     }
 
 
+    private ByteBuffer allocateBuffer(int length) {
+      return ByteBuffer.allocate(length);
+    }
+
     private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
     private ExtendedBlock getBlock(ExtendedBlock blockGroup, int i) {
       return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
       return StripedBlockUtil.constructInternalBlock(blockGroup, cellSize,
           dataBlkNum, i);
           dataBlkNum, i);
@@ -313,37 +328,67 @@ public final class ErasureCodingWorker {
           cellSize, dataBlkNum, i);
           cellSize, dataBlkNum, i);
     }
     }
 
 
+    /**
+     * StripedReader is used to read from one source DN, it contains a block
+     * reader, buffer and striped block index.
+     * Only allocate StripedReader once for one source, and the StripedReader
+     * has the same array order with sources. Typically we only need to allocate
+     * minimum number (minRequiredSources) of StripedReader, and allocate
+     * new for new source DN if some existing DN invalid or slow.
+     * If some source DN is corrupt, set the corresponding blockReader to 
+     * null and will never read from it again.
+     *  
+     * @param i the array index of sources
+     * @param offsetInBlock offset for the internal block
+     * @return StripedReader
+     */
+    private StripedReader addStripedReader(int i, long offsetInBlock) {
+      StripedReader reader = new StripedReader(liveIndices[i]);
+      stripedReaders.add(reader);
+
+      BlockReader blockReader = newBlockReader(
+          getBlock(blockGroup, liveIndices[i]), offsetInBlock, sources[i]);
+      if (blockReader != null) {
+        initChecksumAndBufferSizeIfNeeded(blockReader);
+        reader.blockReader = blockReader;
+      }
+      reader.buffer = allocateBuffer(bufferSize);
+      return reader;
+    }
+
     @Override
     @Override
     public void run() {
     public void run() {
       datanode.incrementXmitsInProgress();
       datanode.incrementXmitsInProgress();
       try {
       try {
-        // Store the indices of successfully read source
-        // This will be updated after doing real read.
-        int[] success = new int[dataBlkNum];
+        // Store the array indices of source DNs we have read successfully.
+        // In each iteration of read, the success list may be updated if
+        // some source DN is corrupted or slow. And use the updated success
+        // list of DNs for next iteration read.
+        int[] success = new int[minRequiredSources];
 
 
         int nsuccess = 0;
         int nsuccess = 0;
-        for (int i = 0; i < sources.length && nsuccess < dataBlkNum; i++) {
-          StripedReader reader = new StripedReader(liveIndices[i]);
-          stripedReaders.add(reader);
-
-          BlockReader blockReader = newBlockReader(
-              getBlock(blockGroup, liveIndices[i]), 0, sources[i]);
-          if (blockReader != null) {
-            initChecksumAndBufferSizeIfNeeded(blockReader);
-            reader.blockReader = blockReader;
-            reader.buffer = ByteBuffer.allocate(bufferSize);
+        for (int i = 0; 
+            i < sources.length && nsuccess < minRequiredSources; i++) {
+          StripedReader reader = addStripedReader(i, 0);
+          if (reader.blockReader != null) {
             success[nsuccess++] = i;
             success[nsuccess++] = i;
           }
           }
         }
         }
 
 
-        if (nsuccess < dataBlkNum) {
+        if (nsuccess < minRequiredSources) {
           String error = "Can't find minimum sources required by "
           String error = "Can't find minimum sources required by "
               + "recovery, block id: " + blockGroup.getBlockId();
               + "recovery, block id: " + blockGroup.getBlockId();
           throw new IOException(error);
           throw new IOException(error);
         }
         }
 
 
+        if (zeroStripeBuffers != null) {
+          for (int i = 0; i < zeroStripeBuffers.length; i++) {
+            zeroStripeBuffers[i] = allocateBuffer(bufferSize);
+          }
+        }
+
         for (int i = 0; i < targets.length; i++) {
         for (int i = 0; i < targets.length; i++) {
-          targetBuffers[i] = ByteBuffer.allocate(bufferSize);
+          targetBuffers[i] = allocateBuffer(bufferSize);
         }
         }
 
 
         checksumSize = checksum.getChecksumSize();
         checksumSize = checksum.getChecksumSize();
@@ -356,7 +401,9 @@ public final class ErasureCodingWorker {
         packetBuf = new byte[maxPacketSize];
         packetBuf = new byte[maxPacketSize];
         checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
         checksumBuf = new byte[checksumSize * (bufferSize / bytesPerChecksum)];
 
 
-        // Store whether the target is success
+        // targetsStatus store whether some target is success, it will record
+        // any failed target once, if some target failed (invalid DN or transfer
+        // failed), will not transfer data to it any more.
         boolean[] targetsStatus = new boolean[targets.length];
         boolean[] targetsStatus = new boolean[targets.length];
         if (initTargetStreams(targetsStatus) == 0) {
         if (initTargetStreams(targetsStatus) == 0) {
           String error = "All targets are failed.";
           String error = "All targets are failed.";
@@ -367,16 +414,11 @@ public final class ErasureCodingWorker {
         while (positionInBlock < firstStripedBlockLength) {
         while (positionInBlock < firstStripedBlockLength) {
           int toRead = Math.min(
           int toRead = Math.min(
               bufferSize, (int)(firstStripedBlockLength - positionInBlock));
               bufferSize, (int)(firstStripedBlockLength - positionInBlock));
-          // step1: read minimum striped buffer size data required by recovery.
-          nsuccess = readMinimumStripedData4Recovery(success);
-
-          if (nsuccess < dataBlkNum) {
-            String error = "Can't read data from minimum number of sources "
-                + "required by recovery, block id: " + blockGroup.getBlockId();
-            throw new IOException(error);
-          }
+          // step1: read from minimum source DNs required for reconstruction.
+          //   The returned success list is the source DNs we do real read from
+          success = readMinimumStripedData4Recovery(success);
 
 
-          // step2: encode/decode to recover targets
+          // step2: decode to reconstruct targets
           long remaining = firstStripedBlockLength - positionInBlock;
           long remaining = firstStripedBlockLength - positionInBlock;
           int toRecoverLen = remaining < bufferSize ? 
           int toRecoverLen = remaining < bufferSize ? 
               (int)remaining : bufferSize;
               (int)remaining : bufferSize;
@@ -426,65 +468,97 @@ public final class ErasureCodingWorker {
       }
       }
     }
     }
 
 
-    // assume liveIndices is not ordered.
     private void getTargetIndices() {
     private void getTargetIndices() {
       BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
       BitSet bitset = new BitSet(dataBlkNum + parityBlkNum);
       for (int i = 0; i < sources.length; i++) {
       for (int i = 0; i < sources.length; i++) {
         bitset.set(liveIndices[i]);
         bitset.set(liveIndices[i]);
       }
       }
       int m = 0;
       int m = 0;
-      for (int i = 0; i < dataBlkNum + parityBlkNum && m < targets.length; i++) {
+      int k = 0;
+      for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
         if (!bitset.get(i)) {
         if (!bitset.get(i)) {
-          targetIndices[m++] = (short)i;
+          if (getBlockLen(blockGroup, i) > 0) {
+            if (m < targets.length) {
+              targetIndices[m++] = (short)i;
+            }
+          } else {
+            zeroStripeIndices[k++] = (short)i;
+          }
         }
         }
       }
       }
     }
     }
 
 
+    private long getReadLength(int index) {
+      long blockLen = getBlockLen(blockGroup, index);
+      long remaining = blockLen - positionInBlock;
+      return remaining > bufferSize ? bufferSize : remaining;
+    }
+
     /**
     /**
-     * Read minimum striped buffer size data required by recovery.
-     * <code>success</code> list will be updated after read.
+     * Read from minimum source DNs required for reconstruction in the iteration.
+     * First try the success list which we think they are the best DNs
+     * If source DN is corrupt or slow, try to read some other source DN, 
+     * and will update the success list. 
+     * 
+     * Remember the updated success list and return it for following 
+     * operations and next iteration read.
      * 
      * 
-     * Initially we only read from <code>dataBlkNum</code> sources, 
-     * if timeout or failure for some source, we will try to schedule 
-     * read from a new source. 
+     * @param success the initial success list of source DNs we think best
+     * @return updated success list of source DNs we do real read
+     * @throws IOException
      */
      */
-    private int readMinimumStripedData4Recovery(int[] success) {
-
+    private int[] readMinimumStripedData4Recovery(final int[] success)
+        throws IOException {
+      int nsuccess = 0;
+      int[] newSuccess = new int[minRequiredSources];
       BitSet used = new BitSet(sources.length);
       BitSet used = new BitSet(sources.length);
-      for (int i = 0; i < dataBlkNum; i++) {
+      /*
+       * Read from minimum source DNs required, the success list contains
+       * source DNs which we think best.
+       */
+      for (int i = 0; i < minRequiredSources; i++) {
         StripedReader reader = stripedReaders.get(success[i]);
         StripedReader reader = stripedReaders.get(success[i]);
-        Callable<Void> readCallable = readFromBlock(
-            reader.blockReader, reader.buffer);
-        Future<Void> f = readService.submit(readCallable);
-        futures.put(f, success[i]);
+        if (getReadLength(liveIndices[success[i]]) > 0) {
+          Callable<Void> readCallable = readFromBlock(
+              reader.blockReader, reader.buffer);
+          Future<Void> f = readService.submit(readCallable);
+          futures.put(f, success[i]);
+        } else {
+          // If the read length is 0, we don't need to do real read
+          reader.buffer.position(0);
+          newSuccess[nsuccess++] = success[i];
+        }
         used.set(success[i]);
         used.set(success[i]);
       }
       }
 
 
-      int nsuccess = 0;
       while (!futures.isEmpty()) {
       while (!futures.isEmpty()) {
         try {
         try {
           StripingChunkReadResult result =
           StripingChunkReadResult result =
               StripedBlockUtil.getNextCompletedStripedRead(
               StripedBlockUtil.getNextCompletedStripedRead(
                   readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
                   readService, futures, STRIPED_READ_THRESHOLD_MILLIS);
+          int resultIndex = -1;
           if (result.state == StripingChunkReadResult.SUCCESSFUL) {
           if (result.state == StripingChunkReadResult.SUCCESSFUL) {
-            success[nsuccess++] = result.index;
-            if (nsuccess >= dataBlkNum) {
-              // cancel remaining reads if we read successfully from minimum
-              // number of sources required for recovery.
-              cancelReads(futures.keySet());
-              futures.clear();
-              break;
-            }
+            resultIndex = result.index;
           } else if (result.state == StripingChunkReadResult.FAILED) {
           } else if (result.state == StripingChunkReadResult.FAILED) {
-            // If read failed for some source, we should not use it anymore 
-            // and schedule read from a new source.
+            // If read failed for some source DN, we should not use it anymore 
+            // and schedule read from another source DN.
             StripedReader failedReader = stripedReaders.get(result.index);
             StripedReader failedReader = stripedReaders.get(result.index);
             closeBlockReader(failedReader.blockReader);
             closeBlockReader(failedReader.blockReader);
             failedReader.blockReader = null;
             failedReader.blockReader = null;
-            scheduleNewRead(used);
+            resultIndex = scheduleNewRead(used);
           } else if (result.state == StripingChunkReadResult.TIMEOUT) {
           } else if (result.state == StripingChunkReadResult.TIMEOUT) {
             // If timeout, we also schedule a new read.
             // If timeout, we also schedule a new read.
-            scheduleNewRead(used);
+            resultIndex = scheduleNewRead(used);
+          }
+          if (resultIndex >= 0) {
+            newSuccess[nsuccess++] = resultIndex;
+            if (nsuccess >= minRequiredSources) {
+              // cancel remaining reads if we read successfully from minimum
+              // number of source DNs required by reconstruction.
+              cancelReads(futures.keySet());
+              futures.clear();
+              break;
+            }
           }
           }
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
           LOG.info("Read data interrupted.", e);
           LOG.info("Read data interrupted.", e);
@@ -492,19 +566,13 @@ public final class ErasureCodingWorker {
         }
         }
       }
       }
 
 
-      return nsuccess;
-    }
-
-    /**
-     * Return true if need to do encoding to recovery missed striped block.
-     */
-    private boolean shouldEncode(int[] success) {
-      for (int i = 0; i < success.length; i++) {
-        if (stripedReaders.get(success[i]).index >= dataBlkNum) {
-          return false;
-        }
+      if (nsuccess < minRequiredSources) {
+        String error = "Can't read data from minimum number of sources "
+            + "required by reconstruction, block id: " + blockGroup.getBlockId();
+        throw new IOException(error);
       }
       }
-      return true;
+
+      return newSuccess;
     }
     }
     
     
     private void paddingBufferToLen(ByteBuffer buffer, int len) {
     private void paddingBufferToLen(ByteBuffer buffer, int len) {
@@ -514,13 +582,6 @@ public final class ErasureCodingWorker {
       }
       }
     }
     }
     
     
-    // Initialize encoder
-    private void initEncoderIfNecessary() {
-      if (encoder == null) {
-        encoder = newEncoder(dataBlkNum, parityBlkNum);
-      }
-    }
-    
     // Initialize decoder
     // Initialize decoder
     private void initDecoderIfNecessary() {
     private void initDecoderIfNecessary() {
       if (decoder == null) {
       if (decoder == null) {
@@ -528,119 +589,119 @@ public final class ErasureCodingWorker {
       }
       }
     }
     }
 
 
+    private int[] getErasedIndices(boolean[] targetsStatus) {
+      int[] result = new int[targets.length];
+      int m = 0;
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          result[m++] = convertIndex4Decode(targetIndices[i], 
+              dataBlkNum, parityBlkNum);
+        }
+      }
+      return Arrays.copyOf(result, m);
+    }
+
     private void recoverTargets(int[] success, boolean[] targetsStatus,
     private void recoverTargets(int[] success, boolean[] targetsStatus,
         int toRecoverLen) {
         int toRecoverLen) {
-      if (shouldEncode(success)) {
-        initEncoderIfNecessary();
-        ByteBuffer[] dataBuffers = new ByteBuffer[dataBlkNum];
-        ByteBuffer[] parityBuffers = new ByteBuffer[parityBlkNum];
-        for (int i = 0; i < dataBlkNum; i++) {
-          StripedReader reader = stripedReaders.get(i);
-          ByteBuffer buffer = reader.buffer;
-          paddingBufferToLen(buffer, toRecoverLen);
-          dataBuffers[i] = (ByteBuffer)buffer.flip();
-        }
-        for (int i = dataBlkNum; i < stripedReaders.size(); i++) {
-          StripedReader reader = stripedReaders.get(i);
-          parityBuffers[reader.index - dataBlkNum] = cleanBuffer(reader.buffer);
-        }
-        for (int i = 0; i < targets.length; i++) {
-          parityBuffers[targetIndices[i] - dataBlkNum] = targetBuffers[i];
-        }
-        for (int i = 0; i < parityBlkNum; i++) {
-          if (parityBuffers[i] == null) {
-            parityBuffers[i] = ByteBuffer.allocate(toRecoverLen);
-          } else {
-            parityBuffers[i].limit(toRecoverLen);
-          }
-        }
-        encoder.encode(dataBuffers, parityBuffers);
-      } else {
-        /////////// TODO: wait for HADOOP-11847 /////////////
-        ////////// The current decode method always try to decode parityBlkNum number of data blocks. ////////////
-        initDecoderIfNecessary();
-        ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
-        for (int i = 0; i < success.length; i++) {
-          StripedReader reader = stripedReaders.get(success[i]);
-          ByteBuffer buffer = reader.buffer;
+      initDecoderIfNecessary();
+      ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+      for (int i = 0; i < success.length; i++) {
+        StripedReader reader = stripedReaders.get(success[i]);
+        ByteBuffer buffer = reader.buffer;
+        paddingBufferToLen(buffer, toRecoverLen);
+        inputs[convertIndex4Decode(reader.index, dataBlkNum, parityBlkNum)] = 
+            (ByteBuffer)buffer.flip();
+      }
+      if (success.length < dataBlkNum) {
+        for (int i = 0; i < zeroStripeBuffers.length; i++) {
+          ByteBuffer buffer = zeroStripeBuffers[i];
           paddingBufferToLen(buffer, toRecoverLen);
           paddingBufferToLen(buffer, toRecoverLen);
-          int index = reader.index < dataBlkNum ? 
-              reader.index + parityBlkNum : reader.index - dataBlkNum;
+          int index = convertIndex4Decode(zeroStripeIndices[i], dataBlkNum,
+              parityBlkNum);
           inputs[index] = (ByteBuffer)buffer.flip();
           inputs[index] = (ByteBuffer)buffer.flip();
         }
         }
-        int[] indices4Decode = new int[parityBlkNum];
-        int m = 0;
-        for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
-          if (inputs[i] == null) {
-            inputs[i] = ByteBuffer.allocate(toRecoverLen);
-            indices4Decode[m++] = i;
-          }
-        }
-        ByteBuffer[] outputs = new ByteBuffer[parityBlkNum];
-        m = 0;
-        // targetIndices is subset of indices4Decode
-        for (int i = 0; i < parityBlkNum; i++) {
-          if (m < targetIndices.length && 
-              (indices4Decode[i] - parityBlkNum) == targetIndices[m]) {
-            outputs[i] = targetBuffers[m++];
-            outputs[i].limit(toRecoverLen);
-          } else {
-            outputs[i] = ByteBuffer.allocate(toRecoverLen);
-          }
+      }
+      int[] erasedIndices = getErasedIndices(targetsStatus);
+      ByteBuffer[] outputs = new ByteBuffer[erasedIndices.length];
+      int m = 0;
+      for (int i = 0; i < targetBuffers.length; i++) {
+        if (targetsStatus[i]) {
+          outputs[m++] = targetBuffers[i];
+          outputs[i].limit(toRecoverLen);
         }
         }
-        
-        decoder.decode(inputs, indices4Decode, outputs);
-        
-        for (int i = 0; i < targets.length; i++) {
-          if (targetsStatus[i]) {
-            long blockLen = getBlockLen(blockGroup, targetIndices[i]);
-            long remaining = blockLen - positionInBlock;
-            if (remaining < 0) {
-              targetBuffers[i].limit(0);
-            } else if (remaining < toRecoverLen) {
-              targetBuffers[i].limit((int)remaining);
-            }
+      }
+      decoder.decode(inputs, erasedIndices, outputs);
+
+      for (int i = 0; i < targets.length; i++) {
+        if (targetsStatus[i]) {
+          long blockLen = getBlockLen(blockGroup, targetIndices[i]);
+          long remaining = blockLen - positionInBlock;
+          if (remaining < 0) {
+            targetBuffers[i].limit(0);
+          } else if (remaining < toRecoverLen) {
+            targetBuffers[i].limit((int)remaining);
           }
           }
         }
         }
       }
       }
     }
     }
 
 
-    /** 
-     * Schedule read from a new source, we first try un-initial source, 
-     * then try un-used source in this round and bypass failed source.
+    /**
+     * Schedule a read from some new source DN if some DN is corrupted
+     * or slow, this is called from the read iteration.
+     * Initially we may only have <code>minRequiredSources</code> number of 
+     * StripedReader.
+     * If the position is at the end of target block, don't need to do 
+     * real read, and return the array index of source DN, otherwise -1.
+     * 
+     * @param used the used source DNs in this iteration.
+     * @return the array index of source DN if don't need to do real read.
      */
      */
-    private void scheduleNewRead(BitSet used) {
+    private int scheduleNewRead(BitSet used) {
       StripedReader reader = null;
       StripedReader reader = null;
+      // step1: initially we may only have <code>minRequiredSources</code>
+      // number of StripedReader, and there may be some source DNs we never 
+      // read before, so will try to create StripedReader for one new source DN
+      // and try to read from it. If found, go to step 3.
       int m = stripedReaders.size();
       int m = stripedReaders.size();
-      while (m < sources.length && reader == null) {
-        reader = new StripedReader(liveIndices[m]);
-        BlockReader blockReader = newBlockReader(
-            getBlock(blockGroup, liveIndices[m]), positionInBlock, sources[m]);
-        stripedReaders.add(reader);
-        if (blockReader != null) {
-          assert blockReader.getDataChecksum().equals(checksum);
-          reader.blockReader = blockReader;
-          reader.buffer = ByteBuffer.allocate(bufferSize);
+      while (reader == null && m < sources.length) {
+        reader = addStripedReader(m, positionInBlock);
+        if (getReadLength(liveIndices[m]) > 0) {
+          if (reader.blockReader == null) {
+            reader = null;
+            m++;
+          }
         } else {
         } else {
-          m++;
-          reader = null;
+          used.set(m);
+          return m;
         }
         }
       }
       }
 
 
+      // step2: if there is no new source DN we can use, try to find a source 
+      // DN we ever read from but because some reason, e.g., slow, it
+      // is not in the success DN list at the begin of this iteration, so 
+      // we have not tried it in this iteration. Now we have a chance to 
+      // revisit it again.
       for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
       for (int i = 0; reader == null && i < stripedReaders.size(); i++) {
-        StripedReader r = stripedReaders.get(i);
-        if (r.blockReader != null && !used.get(i)) {
-          closeBlockReader(r.blockReader);
-          r.blockReader = newBlockReader(
-              getBlock(blockGroup, liveIndices[i]), positionInBlock,
-              sources[i]);
-          if (r.blockReader != null) {
-            m = i;
-            reader = r;
+        if (!used.get(i)) {
+          StripedReader r = stripedReaders.get(i);
+          if (getReadLength(liveIndices[i]) > 0) {
+            closeBlockReader(r.blockReader);
+            r.blockReader = newBlockReader(
+                getBlock(blockGroup, liveIndices[i]), positionInBlock,
+                sources[i]);
+            if (r.blockReader != null) {
+              m = i;
+              reader = r;
+            }
+          } else {
+            used.set(i);
+            r.buffer.position(0);
+            return i;
           }
           }
         }
         }
       }
       }
 
 
+      // step3: schedule if find a correct source DN and need to do real read.
       if (reader != null) {
       if (reader != null) {
         Callable<Void> readCallable = readFromBlock(
         Callable<Void> readCallable = readFromBlock(
             reader.blockReader, reader.buffer);
             reader.blockReader, reader.buffer);
@@ -648,6 +709,8 @@ public final class ErasureCodingWorker {
         futures.put(f, m);
         futures.put(f, m);
         used.set(m);
         used.set(m);
       }
       }
+
+      return -1;
     }
     }
 
 
     // cancel all reads.
     // cancel all reads.
@@ -708,7 +771,10 @@ public final class ErasureCodingWorker {
     }
     }
 
 
     private BlockReader newBlockReader(final ExtendedBlock block, 
     private BlockReader newBlockReader(final ExtendedBlock block, 
-        long startOffset, DatanodeInfo dnInfo) {
+        long offsetInBlock, DatanodeInfo dnInfo) {
+      if (offsetInBlock >= block.getNumBytes()) {
+        return null;
+      }
       try {
       try {
         InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
         InetSocketAddress dnAddr = getSocketAddress4Transfer(dnInfo);
         Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
         Token<BlockTokenIdentifier> blockToken = datanode.getBlockAccessToken(
@@ -720,7 +786,8 @@ public final class ErasureCodingWorker {
          * requires config for domain-socket in UNIX or legacy config in Windows.
          * requires config for domain-socket in UNIX or legacy config in Windows.
          */
          */
         return RemoteBlockReader2.newBlockReader(
         return RemoteBlockReader2.newBlockReader(
-            "dummy", block, blockToken, startOffset, block.getNumBytes(), true,
+            "dummy", block, blockToken, offsetInBlock, 
+            block.getNumBytes() - offsetInBlock, true,
             "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
             "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
             null, cachingStrategy);
             null, cachingStrategy);
       } catch (IOException e) {
       } catch (IOException e) {
@@ -808,6 +875,12 @@ public final class ErasureCodingWorker {
         }
         }
       }
       }
 
 
+      if (zeroStripeBuffers != null) {
+        for (int i = 0; i < zeroStripeBuffers.length; i++) {
+          zeroStripeBuffers[i].clear();
+        }
+      }
+
       for (int i = 0; i < targetBuffers.length; i++) {
       for (int i = 0; i < targetBuffers.length; i++) {
         if (targetBuffers[i] != null) {
         if (targetBuffers[i] != null) {
           cleanBuffer(targetBuffers[i]);
           cleanBuffer(targetBuffers[i]);
@@ -903,7 +976,7 @@ public final class ErasureCodingWorker {
   }
   }
 
 
   private static class StripedReader {
   private static class StripedReader {
-    private final short index;
+    private final short index; // internal block index
     private BlockReader blockReader;
     private BlockReader blockReader;
     private ByteBuffer buffer;
     private ByteBuffer buffer;
 
 

+ 54 - 11
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java

@@ -100,29 +100,69 @@ public class TestRecoverStripedFile {
   }
   }
   
   
   @Test(timeout = 120000)
   @Test(timeout = 120000)
-  public void testRecoverThreeParityBlocks() throws Exception {
+  public void testRecoverOneParityBlock1() throws Exception {
+    int fileLen = cellSize + cellSize/10;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock2() throws Exception {
+    int fileLen = 1;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock3() throws Exception {
     int fileLen = 3 * blockSize + blockSize/10;
     int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverThreeParityBlocks() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
     assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
     assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
   }
   }
   
   
   @Test(timeout = 120000)
   @Test(timeout = 120000)
   public void testRecoverThreeDataBlocks() throws Exception {
   public void testRecoverThreeDataBlocks() throws Exception {
-    int fileLen = 3 * blockSize + blockSize/10;
+    int fileLen = 10 * blockSize + blockSize/10;
     assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
     assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
   }
   }
   
   
+  @Test(timeout = 120000)
+  public void testRecoverThreeDataBlocks1() throws Exception {
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3);
+  }
+  
   @Test(timeout = 120000)
   @Test(timeout = 120000)
   public void testRecoverOneDataBlock() throws Exception {
   public void testRecoverOneDataBlock() throws Exception {
-    ////TODO: TODO: wait for HADOOP-11847
-    //int fileLen = 10 * blockSize + blockSize/10;
-    //assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneDataBlock1() throws Exception {
+    int fileLen = cellSize + cellSize/10;
+    assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneDataBlock2() throws Exception {
+    int fileLen = 1;
+    assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1);
   }
   }
   
   
   @Test(timeout = 120000)
   @Test(timeout = 120000)
   public void testRecoverAnyBlocks() throws Exception {
   public void testRecoverAnyBlocks() throws Exception {
-    ////TODO: TODO: wait for HADOOP-11847
-    //int fileLen = 3 * blockSize + blockSize/10;
-    //assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverAnyBlocks1() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3);
   }
   }
   
   
   /**
   /**
@@ -203,6 +243,9 @@ public class TestRecoverStripedFile {
       replicaContents[i] = readReplica(replicas[i]);
       replicaContents[i] = readReplica(replicas[i]);
     }
     }
     
     
+    int cellsNum = (fileLen - 1) / cellSize + 1;
+    int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
+
     try {
     try {
       DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum];
       DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum];
       for (int i = 0; i < toRecoverBlockNum; i++) {
       for (int i = 0; i < toRecoverBlockNum; i++) {
@@ -216,7 +259,6 @@ public class TestRecoverStripedFile {
         dnIDs[i] = dn.getDatanodeId();
         dnIDs[i] = dn.getDatanodeId();
       }
       }
       setDataNodesDead(dnIDs);
       setDataNodesDead(dnIDs);
-       
       
       
       // Check the locatedBlocks of the file again
       // Check the locatedBlocks of the file again
       locatedBlocks = getLocatedBlocks(file);
       locatedBlocks = getLocatedBlocks(file);
@@ -232,7 +274,7 @@ public class TestRecoverStripedFile {
         }
         }
       }
       }
       
       
-      waitForRecoveryFinished(file);
+      waitForRecoveryFinished(file, groupSize);
       
       
       targetDNs = sortTargetsByReplicas(blocks, targetDNs);
       targetDNs = sortTargetsByReplicas(blocks, targetDNs);
       
       
@@ -319,7 +361,8 @@ public class TestRecoverStripedFile {
     }
     }
   }
   }
   
   
-  private LocatedBlocks waitForRecoveryFinished(Path file) throws Exception {
+  private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) 
+      throws Exception {
     final int ATTEMPTS = 60;
     final int ATTEMPTS = 60;
     for (int i = 0; i < ATTEMPTS; i++) {
     for (int i = 0; i < ATTEMPTS; i++) {
       LocatedBlocks locatedBlocks = getLocatedBlocks(file);
       LocatedBlocks locatedBlocks = getLocatedBlocks(file);