Parcourir la source

HDFS-8481. Erasure coding: remove workarounds in client side stripped blocks recovering. Contributed by Zhe Zhang.

Zhe Zhang il y a 10 ans
Parent
commit
014bd32c58

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

@@ -262,3 +262,6 @@
 
     HDFS-8479. Erasure coding: fix striping related logic in FSDirWriteFileOp to
     sync with HDFS-8421. (Zhe Zhang via jing9)
+
+    HDFS-8481. Erasure coding: remove workarounds in client side stripped blocks 
+    recovering. (zhz)

+ 9 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.io.ByteBufferPool;
 
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.divideByteRangeIntoStripes;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.initDecodeInputs;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.finalizeDecodeInputs;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.decodeAndFillBuffer;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getNextCompletedStripedRead;
 import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getStartOffsetsForInternalBlocks;
@@ -41,6 +42,8 @@ import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResu
 
 import org.apache.hadoop.io.erasurecode.ECSchema;
 
+import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -117,6 +120,8 @@ public class DFSStripedInputStream extends DFSInputStream {
   /** the buffer for a complete stripe */
   private ByteBuffer curStripeBuf;
   private final ECSchema schema;
+  private final RawErasureDecoder decoder;
+
   /**
    * indicate the start/end offset of the current buffered stripe in the
    * block group
@@ -139,6 +144,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     curStripeRange = new StripeRange(0, 0);
     readingService =
         new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+    decoder = new RSRawDecoder(dataBlkNum, parityBlkNum);
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("Creating an striped input stream for file " + src);
     }
@@ -591,8 +597,9 @@ public class DFSStripedInputStream extends DFSInputStream {
     }
 
     if (alignedStripe.missingChunksNum > 0) {
-      decodeAndFillBuffer(decodeInputs, buf, alignedStripe,
-          dataBlkNum, parityBlkNum);
+      finalizeDecodeInputs(decodeInputs, alignedStripe);
+      decodeAndFillBuffer(decodeInputs, buf, alignedStripe, parityBlkNum,
+          decoder);
     }
   }
 

+ 41 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
 
 import java.util.*;
 import java.io.IOException;
@@ -246,19 +247,36 @@ public class StripedBlockUtil {
 
   /**
    * Initialize the decoding input buffers based on the chunk states in an
-   * AlignedStripe
+   * {@link AlignedStripe}. For each chunk that was not initially requested,
+   * schedule a new fetch request with the decoding input buffer as transfer
+   * destination.
    */
   public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
       int dataBlkNum, int parityBlkNum) {
     byte[][] decodeInputs =
         new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
-      StripingChunk chunk = alignedStripe.chunks[i];
-      if (chunk == null) {
+      if (alignedStripe.chunks[i] == null) {
         alignedStripe.chunks[i] = new StripingChunk(decodeInputs[i]);
         alignedStripe.chunks[i].offsetsInBuf.add(0);
         alignedStripe.chunks[i].lengthsInBuf.add((int) alignedStripe.getSpanInBlock());
-      } else if (chunk.state == StripingChunk.FETCHED) {
+      }
+    }
+    return decodeInputs;
+  }
+
+  /**
+   * Some fetched {@link StripingChunk} might be stored in original application
+   * buffer instead of prepared decode input buffers. Some others are beyond
+   * the range of the internal blocks and should correspond to all zero bytes.
+   * When all pending requests have returned, this method should be called to
+   * finalize decode input buffers.
+   */
+  public static void finalizeDecodeInputs(final byte[][] decodeInputs,
+      AlignedStripe alignedStripe) {
+    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+      StripingChunk chunk = alignedStripe.chunks[i];
+      if (chunk.state == StripingChunk.FETCHED) {
         int posInBuf = 0;
         for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
           System.arraycopy(chunk.buf, chunk.offsetsInBuf.get(j),
@@ -267,39 +285,41 @@ public class StripedBlockUtil {
         }
       } else if (chunk.state == StripingChunk.ALLZERO) {
         Arrays.fill(decodeInputs[i], (byte)0);
+      } else {
+        decodeInputs[i] = null;
       }
     }
-    return decodeInputs;
   }
-
   /**
-   * Decode based on the given input buffers and schema
+   * Decode based on the given input buffers and schema.
    */
-  public static void decodeAndFillBuffer(final byte[][] decodeInputs, byte[] buf,
-      AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum) {
+  public static void decodeAndFillBuffer(final byte[][] decodeInputs,
+      byte[] buf, AlignedStripe alignedStripe, int parityBlkNum,
+      RawErasureDecoder decoder) {
+    // Step 1: prepare indices and output buffers for missing data units
     int[] decodeIndices = new int[parityBlkNum];
     int pos = 0;
     for (int i = 0; i < alignedStripe.chunks.length; i++) {
-      if (alignedStripe.chunks[i].state != StripingChunk.FETCHED &&
-          alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+      if (alignedStripe.chunks[i].state == StripingChunk.MISSING){
         decodeIndices[pos++] = i;
       }
     }
+    decodeIndices = Arrays.copyOf(decodeIndices, pos);
+    byte[][] decodeOutputs =
+        new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];
 
-    byte[][] outputs = new byte[parityBlkNum][(int) alignedStripe.getSpanInBlock()];
-    RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlkNum, parityBlkNum);
-    rsRawDecoder.decode(decodeInputs, decodeIndices, outputs);
+    // Step 2: decode into prepared output buffers
+    decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
 
-    for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
-      StripingChunk chunk = alignedStripe.chunks[i];
+    // Step 3: fill original application buffer with decoded data
+    for (int i = 0; i < decodeIndices.length; i++) {
+      int missingBlkIdx = decodeIndices[i];
+      StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
       if (chunk.state == StripingChunk.MISSING) {
         int srcPos = 0;
         for (int j = 0; j < chunk.offsetsInBuf.size(); j++) {
-          //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
-//          System.arraycopy(outputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
-//              chunk.lengthsInBuf.get(j));
-          Arrays.fill(buf, chunk.offsetsInBuf.get(j),
-              chunk.offsetsInBuf.get(j) + chunk.lengthsInBuf.get(j), (byte)7);
+          System.arraycopy(decodeOutputs[i], srcPos, buf, chunk.offsetsInBuf.get(j),
+              chunk.lengthsInBuf.get(j));
           srcPos += chunk.lengthsInBuf.get(j);
         }
       }

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java

@@ -221,13 +221,13 @@ public class TestDFSStripedInputStream {
         decodeInputs[DATA_BLK_NUM][k] = SimulatedFSDataset.simulatedByte(
             new Block(bg.getBlock().getBlockId() + DATA_BLK_NUM), posInBlk);
       }
-//      RSRawDecoder rsRawDecoder = new RSRawDecoder();
-//      rsRawDecoder.initialize(DATA_BLK_NUM, PARITY_BLK_NUM, CELLSIZE);
-//      rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+      for (int m : missingBlkIdx) {
+        decodeInputs[m] = null;
+      }
+      RSRawDecoder rsRawDecoder = new RSRawDecoder(DATA_BLK_NUM, PARITY_BLK_NUM);
+      rsRawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
       int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
-//      System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
-      //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
-      Arrays.fill(expected, posInBuf, posInBuf + CELLSIZE, (byte)7);
+      System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
     }
     int delta = 10;
     int done = 0;

+ 1 - 7
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java

@@ -382,15 +382,9 @@ public class TestWriteReadStripedFile {
       Assert.assertEquals("The length of file should be the same to write size",
           length - startOffsetInFile, readLen);
 
-      RSRawDecoder rsRawDecoder = new RSRawDecoder(dataBlocks, parityBlocks);
       byte[] expected = new byte[readLen];
       for (int i = startOffsetInFile; i < length; i++) {
-        //TODO: workaround (filling fixed bytes), to remove after HADOOP-11938
-        if ((i / cellSize) % dataBlocks == failedDNIdx) {
-          expected[i - startOffsetInFile] = (byte)7;
-        } else {
-          expected[i - startOffsetInFile] = getByte(i);
-        }
+        expected[i - startOffsetInFile] = getByte(i);
       }
       for (int i = startOffsetInFile; i < length; i++) {
         Assert.assertEquals("Byte at " + i + " should be the same",