1
0
Prechádzať zdrojové kódy

HDFS-9180. Update excluded DataNodes in DFSStripedOutputStream based on failures in data streamers. Contributed by Jing Zhao.

Jing Zhao 9 rokov pred
rodič
commit
a8b4d0ff28

+ 75 - 32
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java

@@ -377,17 +377,21 @@ public class DFSStripedOutputStream extends DFSOutputStream {
 
   private void replaceFailedStreamers() {
     assert streamers.size() == numAllBlocks;
+    final int currentIndex = getCurrentIndex();
+    assert currentIndex == 0;
     for (short i = 0; i < numAllBlocks; i++) {
       final StripedDataStreamer oldStreamer = getStripedDataStreamer(i);
       if (!oldStreamer.isHealthy()) {
+        LOG.info("replacing previously failed streamer " + oldStreamer);
         StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat,
             dfsClient, src, oldStreamer.progress,
             oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager,
             favoredNodes, i, coordinator);
         streamers.set(i, streamer);
         currentPackets[i] = null;
-        if (i == 0) {
+        if (i == currentIndex) {
           this.streamer = streamer;
+          this.currentPacket = null;
         }
         streamer.start();
       }
@@ -404,6 +408,18 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     }
   }
 
+  private DatanodeInfo[] getExcludedNodes() {
+    List<DatanodeInfo> excluded = new ArrayList<>();
+    for (StripedDataStreamer streamer : streamers) {
+      for (DatanodeInfo e : streamer.getExcludedNodes()) {
+        if (e != null) {
+          excluded.add(e);
+        }
+      }
+    }
+    return excluded.toArray(new DatanodeInfo[excluded.size()]);
+  }
+
   private void allocateNewBlock() throws IOException {
     if (currentBlockGroup != null) {
       for (int i = 0; i < numAllBlocks; i++) {
@@ -412,17 +428,17 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       }
     }
     failedStreamers.clear();
+    DatanodeInfo[] excludedNodes = getExcludedNodes();
+    LOG.debug("Excluding DataNodes when allocating new block: "
+        + Arrays.asList(excludedNodes));
+
     // replace failed streamers
     replaceFailedStreamers();
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Allocating new block group. The previous block group: "
-          + currentBlockGroup);
-    }
-
-    // TODO collect excludedNodes from all the data streamers
-    final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup,
-        fileId, favoredNodes);
+    LOG.debug("Allocating new block group. The previous block group: "
+        + currentBlockGroup);
+    final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src,
+        currentBlockGroup, fileId, favoredNodes);
     assert lb.isStriped();
     if (lb.getLocations().length < numDataBlocks) {
       throw new IOException("Failed to get " + numDataBlocks
@@ -437,18 +453,17 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         numAllBlocks - numDataBlocks);
     for (int i = 0; i < blocks.length; i++) {
       StripedDataStreamer si = getStripedDataStreamer(i);
-      if (si.isHealthy()) { // skipping failed data streamer
-        if (blocks[i] == null) {
-          // Set exception and close streamer as there is no block locations
-          // found for the parity block.
-          LOG.warn("Failed to get block location for parity block, index=" + i);
-          si.getLastException().set(
-              new IOException("Failed to get following block, i=" + i));
-          si.getErrorState().setInternalError();
-          si.close(true);
-        } else {
-          coordinator.getFollowingBlocks().offer(i, blocks[i]);
-        }
+      assert si.isHealthy();
+      if (blocks[i] == null) {
+        // Set exception and close streamer as there is no block locations
+        // found for the parity block.
+        LOG.warn("Failed to get block location for parity block, index=" + i);
+        si.getLastException().set(
+            new IOException("Failed to get following block, i=" + i));
+        si.getErrorState().setInternalError();
+        si.close(true);
+      } else {
+        coordinator.getFollowingBlocks().offer(i, blocks[i]);
       }
     }
   }
@@ -462,7 +477,6 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   protected synchronized void writeChunk(byte[] bytes, int offset, int len,
       byte[] checksum, int ckoff, int cklen) throws IOException {
     final int index = getCurrentIndex();
-    final StripedDataStreamer current = getCurrentStreamer();
     final int pos = cellBuffers.addTo(index, bytes, offset, len);
     final boolean cellFull = pos == cellSize;
 
@@ -472,6 +486,8 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     }
 
     currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len);
+    // note: the current streamer can be refreshed after allocating a new block
+    final StripedDataStreamer current = getCurrentStreamer();
     if (current.isHealthy()) {
       try {
         super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
@@ -492,11 +508,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
         cellBuffers.flipDataBuffers();
         writeParityCells();
         next = 0;
-        // check failure state for all the streamers. Bump GS if necessary
-        checkStreamerFailures();
 
         // if this is the end of the block group, end each internal block
         if (shouldEndBlockGroup()) {
+          flushAllInternals();
+          checkStreamerFailures();
           for (int i = 0; i < numAllBlocks; i++) {
             final StripedDataStreamer s = setCurrentStreamer(i);
             if (s.isHealthy()) {
@@ -505,6 +521,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
               } catch (IOException ignored) {}
             }
           }
+        } else {
+          // check failure state for all the streamers. Bump GS if necessary
+          checkStreamerFailures();
         }
       }
       setCurrentStreamer(next);
@@ -522,11 +541,32 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     // no need to end block here
   }
 
+  /**
+   * @return whether the data streamer with the given index is streaming data.
+   * Note the streamer may not be in STREAMING stage if the block length is less
+   * than a stripe.
+   */
+  private boolean isStreamerWriting(int streamerIndex) {
+    final long length = currentBlockGroup == null ?
+        0 : currentBlockGroup.getNumBytes();
+    if (length == 0) {
+      return false;
+    }
+    if (streamerIndex >= numDataBlocks) {
+      return true;
+    }
+    final int numCells = (int) ((length - 1) / cellSize + 1);
+    return streamerIndex < numCells;
+  }
+
   private Set<StripedDataStreamer> markExternalErrorOnStreamers() {
     Set<StripedDataStreamer> healthySet = new HashSet<>();
-    for (StripedDataStreamer streamer : streamers) {
-      if (streamer.isHealthy() &&
-          streamer.getStage() == BlockConstructionStage.DATA_STREAMING) {
+    for (int i = 0; i < numAllBlocks; i++) {
+      final StripedDataStreamer streamer = getStripedDataStreamer(i);
+      if (streamer.isHealthy() && isStreamerWriting(i)) {
+        Preconditions.checkState(
+            streamer.getStage() == BlockConstructionStage.DATA_STREAMING,
+            "streamer: " + streamer);
         streamer.setExternalError();
         healthySet.add(streamer);
       }
@@ -541,12 +581,14 @@ public class DFSStripedOutputStream extends DFSOutputStream {
    */
   private void checkStreamerFailures() throws IOException {
     Set<StripedDataStreamer> newFailed = checkStreamers();
-    if (newFailed.size() > 0) {
-      // for healthy streamers, wait till all of them have fetched the new block
-      // and flushed out all the enqueued packets.
-      flushAllInternals();
+    if (newFailed.size() == 0) {
+      return;
     }
-    // get all the current failed streamers after the flush
+
+    // for healthy streamers, wait till all of them have fetched the new block
+    // and flushed out all the enqueued packets.
+    flushAllInternals();
+    // recheck failed streamers again after the flush
     newFailed = checkStreamers();
     while (newFailed.size() > 0) {
       failedStreamers.addAll(newFailed);
@@ -629,6 +671,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       for (StripedDataStreamer streamer : healthyStreamers) {
         if (!coordinator.updateStreamerMap.containsKey(streamer)) {
           // close the streamer if it is too slow to create new connection
+          LOG.info("close the slow stream " + streamer);
           streamer.setStreamerAsClosed();
           failed.add(streamer);
         }

+ 12 - 11
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java

@@ -359,7 +359,7 @@ class DataStreamer extends Daemon {
   private volatile String[] storageIDs = null;
   private final ErrorState errorState;
 
-  private BlockConstructionStage stage;  // block construction stage
+  private volatile BlockConstructionStage stage;  // block construction stage
   protected long bytesSent = 0; // number of bytes that've been sent
   private final boolean isLazyPersistFile;
 
@@ -588,7 +588,7 @@ class DataStreamer extends Daemon {
           LOG.debug("stage=" + stage + ", " + this);
         }
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-          LOG.debug("Allocating new block");
+          LOG.debug("Allocating new block: " + this);
           setPipeline(nextBlockOutputStream());
           initDataStreaming();
         } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
@@ -748,7 +748,7 @@ class DataStreamer extends Daemon {
   void waitForAckedSeqno(long seqno) throws IOException {
     try (TraceScope ignored = dfsClient.getTracer().
         newScope("waitForAckedSeqno")) {
-      LOG.debug("Waiting for ack for: {}", seqno);
+      LOG.debug("{} waiting for ack for: {}", this, seqno);
       long begin = Time.monotonicNow();
       try {
         synchronized (dataQueue) {
@@ -1085,6 +1085,7 @@ class DataStreamer extends Daemon {
     if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) {
       return false;
     }
+    LOG.debug("start process datanode/external error, {}", this);
     if (response != null) {
       LOG.info("Error Recovery for " + block +
           " waiting for responder to exit. ");
@@ -1307,10 +1308,12 @@ class DataStreamer extends Daemon {
    * It keeps on trying until a pipeline is setup
    */
   private void setupPipelineForAppendOrRecovery() throws IOException {
-    // check number of datanodes
+    // Check number of datanodes. Note that if there is no healthy datanode,
+    // this must be internal error because we mark external error in striped
+    // outputstream only when all the streamers are in the DATA_STREAMING stage
     if (nodes == null || nodes.length == 0) {
       String msg = "Could not get block locations. " + "Source file \""
-          + src + "\" - Aborting...";
+          + src + "\" - Aborting..." + this;
       LOG.warn(msg);
       lastException.set(new IOException(msg));
       streamerClosed = true;
@@ -1462,8 +1465,9 @@ class DataStreamer extends Daemon {
     return newBlock;
   }
 
-  private int getNumBlockWriteRetry() {
-    return dfsClient.getConf().getNumBlockWriteRetry();
+  DatanodeInfo[] getExcludedNodes() {
+    return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
+            .keySet().toArray(new DatanodeInfo[0]);
   }
 
   /**
@@ -1483,10 +1487,7 @@ class DataStreamer extends Daemon {
       errorState.resetInternalError();
       lastException.clear();
 
-      DatanodeInfo[] excluded =
-          excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
-              .keySet()
-              .toArray(new DatanodeInfo[0]);
+      DatanodeInfo[] excluded = getExcludedNodes();
       block = oldBlock;
       lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
       block = lb.getBlock();

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java

@@ -105,7 +105,7 @@ public class StripedDataStreamer extends DataStreamer {
       final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
       LOG.info("Excluding datanode " + badNode);
       excludedNodes.put(badNode, badNode);
-      throw new IOException("Unable to create new block.");
+      throw new IOException("Unable to create new block." + this);
     }
     return lb;
   }

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt

@@ -455,3 +455,6 @@
     DFSStripedOutputStream. (jing9 and Walter Su)
 
     HDFS-9185. Fix null tracer in ErasureCodingWorker. (Rakesh R via jing9)
+
+    HDFS-9180. Update excluded DataNodes in DFSStripedOutputStream based on failures
+    in data streamers. (jing9)

+ 37 - 21
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java

@@ -44,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -58,6 +59,7 @@ public class StripedFileTestUtil {
   public static final short NUM_DATA_BLOCKS = (short) 6;
   public static final short NUM_PARITY_BLOCKS = (short) 3;
   public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024;
+  public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS;
 
   static final int stripesPerBlock = 4;
   static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock;
@@ -113,7 +115,9 @@ public class StripedFileTestUtil {
           offset += target;
         }
         for (int i = 0; i < fileLength - startOffset; i++) {
-          assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset is " + startOffset, expected[startOffset + i], result[i]);
+          assertEquals("Byte at " + (startOffset + i) + " is different, "
+              + "the startOffset is " + startOffset, expected[startOffset + i],
+              result[i]);
         }
       }
     }
@@ -251,11 +255,17 @@ public class StripedFileTestUtil {
     return (short) (getRealDataBlockNum(numBytes) + NUM_PARITY_BLOCKS);
   }
 
+  public static void waitBlockGroupsReported(DistributedFileSystem fs,
+      String src) throws Exception {
+    waitBlockGroupsReported(fs, src, 0);
+  }
+
   /**
-   * Wait for all the internalBlocks of the blockGroups of the given file to be reported.
+   * Wait for all the internalBlocks of the blockGroups of the given file to be
+   * reported.
    */
-  public static void waitBlockGroupsReported(DistributedFileSystem fs, String src)
-      throws IOException, InterruptedException, TimeoutException {
+  public static void waitBlockGroupsReported(DistributedFileSystem fs,
+      String src, int numDeadDNs) throws Exception {
     boolean success;
     final int ATTEMPTS = 40;
     int count = 0;
@@ -265,11 +275,12 @@ public class StripedFileTestUtil {
       count++;
       LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0);
       for (LocatedBlock lb : lbs.getLocatedBlocks()) {
-        short expected = getRealTotalBlockNum((int) lb.getBlockSize());
+        short expected = (short) (getRealTotalBlockNum((int) lb.getBlockSize())
+            - numDeadDNs);
         int reported = lb.getLocations().length;
-        if (reported != expected){
+        if (reported < expected){
           success = false;
-          System.out.println("blockGroup " + lb.getBlock() + " of file " + src
+          LOG.info("blockGroup " + lb.getBlock() + " of file " + src
               + " has reported internalBlocks " + reported
               + " (desired " + expected + "); locations "
               + Joiner.on(' ').join(lb.getLocations()));
@@ -278,7 +289,7 @@ public class StripedFileTestUtil {
         }
       }
       if (success) {
-        System.out.println("All blockGroups of file " + src
+        LOG.info("All blockGroups of file " + src
             + " verified to have all internalBlocks.");
       }
     } while (!success && count < ATTEMPTS);
@@ -348,10 +359,9 @@ public class StripedFileTestUtil {
   }
 
   static void checkData(DistributedFileSystem dfs, Path srcPath, int length,
-      int[] killedDnIndex, long oldGS) throws IOException {
+      List<DatanodeInfo> killedList, List<Long> oldGSList) throws IOException {
 
     StripedFileTestUtil.verifyLength(dfs, srcPath, length);
-    Arrays.sort(killedDnIndex);
     List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
     LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L,
         Long.MAX_VALUE);
@@ -361,10 +371,12 @@ public class StripedFileTestUtil {
     }
     assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size());
 
+    int index = 0;
     for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
       Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
 
       final long gs = firstBlock.getBlock().getGenerationStamp();
+      final long oldGS = oldGSList != null ? oldGSList.get(index++) : -1L;
       final String s = "gs=" + gs + ", oldGS=" + oldGS;
       LOG.info(s);
       Assert.assertTrue(s, gs >= oldGS);
@@ -389,6 +401,7 @@ public class StripedFileTestUtil {
       byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][];
       byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][];
 
+      Set<Integer> checkSet = new HashSet<>();
       // for each block, use BlockReader to read data
       for (int i = 0; i < blockList.size(); i++) {
         final int j = i >= NUM_DATA_BLOCKS? 0: i;
@@ -417,19 +430,22 @@ public class StripedFileTestUtil {
           continue;
         }
 
-        if (Arrays.binarySearch(killedDnIndex, i) < 0) {
+        DatanodeInfo dn = blockList.get(i).getLocations()[0];
+        if (!killedList.contains(dn)) {
           final BlockReader blockReader = BlockReaderTestUtil.getBlockReader(
               dfs, lb, 0, block.getNumBytes());
           blockReader.readAll(blockBytes, 0, (int) block.getNumBytes());
           blockReader.close();
+          checkSet.add(i);
         }
       }
+      LOG.info("Internal blocks to check: " + checkSet);
 
       // check data
       final int groupPosInFile = group*BLOCK_GROUP_SIZE;
       for (int i = 0; i < dataBlockBytes.length; i++) {
         boolean killed = false;
-        if (Arrays.binarySearch(killedDnIndex, i) >= 0){
+        if (!checkSet.contains(i)) {
           killed = true;
         }
         final byte[] actual = dataBlockBytes[i];
@@ -453,15 +469,15 @@ public class StripedFileTestUtil {
       }
 
       // check parity
-      verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group)
-              .getBlockSize(),
-          BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex);
+      verifyParityBlocks(dfs.getConf(),
+          lbs.getLocatedBlocks().get(group).getBlockSize(),
+          BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, checkSet);
     }
   }
 
-  static void verifyParityBlocks(Configuration conf, final long size, final int cellSize,
-      byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) {
-    Arrays.sort(killedDnIndex);
+  static void verifyParityBlocks(Configuration conf, final long size,
+      final int cellSize, byte[][] dataBytes, byte[][] parityBytes,
+      Set<Integer> checkSet) {
     // verify the parity blocks
     int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength(
         size, cellSize, dataBytes.length, dataBytes.length);
@@ -482,9 +498,9 @@ public class StripedFileTestUtil {
         CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length);
     encoder.encode(dataBytes, expectedParityBytes);
     for (int i = 0; i < parityBytes.length; i++) {
-      if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){
-        Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex),
-            expectedParityBytes[i], parityBytes[i]);
+      if (checkSet.contains(i + dataBytes.length)){
+        Assert.assertArrayEquals("i=" + i, expectedParityBytes[i],
+            parityBytes[i]);
       }
     }
   }

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java

@@ -18,11 +18,15 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
@@ -151,6 +155,6 @@ public class TestDFSStripedOutputStream {
     StripedFileTestUtil.waitBlockGroupsReported(fs, src);
 
     StripedFileTestUtil.checkData(fs, testPath, writeBytes,
-        new int[]{}, 0);
+        new ArrayList<DatanodeInfo>(), null);
   }
 }

+ 51 - 21
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java

@@ -24,23 +24,25 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.security.token.Token;
@@ -59,6 +61,9 @@ public class TestDFSStripedOutputStreamWithFailure {
   static {
     GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
     GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
+        .getLogger().setLevel(Level.ALL);
   }
 
   private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
@@ -134,6 +139,7 @@ public class TestDFSStripedOutputStreamWithFailure {
   private DistributedFileSystem dfs;
   private final Path dir = new Path("/"
       + TestDFSStripedOutputStreamWithFailure.class.getSimpleName());
+  private final Random random = new Random();
 
   private void setup(Configuration conf) throws IOException {
     final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
@@ -153,7 +159,8 @@ public class TestDFSStripedOutputStreamWithFailure {
   private HdfsConfiguration newHdfsConfiguration() {
     final HdfsConfiguration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+        false);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
     return conf;
@@ -164,11 +171,31 @@ public class TestDFSStripedOutputStreamWithFailure {
     runTest(getLength(56));
   }
 
+  @Test(timeout=240000)
+  public void testDatanodeFailureRandomLength() throws Exception {
+    int lenIndex = random.nextInt(LENGTHS.size());
+    LOG.info("run testMultipleDatanodeFailureRandomLength with length index: "
+        + lenIndex);
+    runTest(getLength(lenIndex));
+  }
+
   @Test(timeout=240000)
   public void testMultipleDatanodeFailure56() throws Exception {
     runTestWithMultipleFailure(getLength(56));
   }
 
+  /**
+   * Randomly pick a length and run tests with multiple data failures
+   * TODO: enable this later
+   */
+  //@Test(timeout=240000)
+  public void testMultipleDatanodeFailureRandomLength() throws Exception {
+    int lenIndex = random.nextInt(LENGTHS.size());
+    LOG.info("run testMultipleDatanodeFailureRandomLength with length index: "
+        + lenIndex);
+    runTestWithMultipleFailure(getLength(lenIndex));
+  }
+
   @Test(timeout=240000)
   public void testBlockTokenExpired() throws Exception {
     final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE);
@@ -208,11 +235,10 @@ public class TestDFSStripedOutputStreamWithFailure {
       }
       cluster.restartNameNodes();
       cluster.triggerHeartbeats();
-      DatanodeInfo[] info = dfs.getClient().datanodeReport(
-          DatanodeReportType.LIVE);
+      DatanodeInfo[] info = dfs.getClient().datanodeReport(DatanodeReportType.LIVE);
       assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
       final Path dirFile = new Path(dir, "ecfile");
-      FSDataOutputStream out = null;
+      FSDataOutputStream out;
       try {
         out = dfs.create(dirFile, true);
         out.write("something".getBytes());
@@ -262,6 +288,7 @@ public class TestDFSStripedOutputStreamWithFailure {
     final HdfsConfiguration conf = newHdfsConfiguration();
     for (int dn = 0; dn < 9; dn++) {
       try {
+        LOG.info("runTest: dn=" + dn + ", length=" + length);
         setup(conf);
         runTest(length, new int[]{length/2}, new int[]{dn}, false);
       } catch (Throwable e) {
@@ -277,10 +304,11 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   void runTestWithMultipleFailure(final int length) throws Exception {
     final HdfsConfiguration conf = newHdfsConfiguration();
-    for(int i=0;i<dnIndexSuite.length;i++){
-      int[] dnIndex = dnIndexSuite[i];
+    for (int[] dnIndex : dnIndexSuite) {
       int[] killPos = getKillPositions(length, dnIndex.length);
       try {
+        LOG.info("runTestWithMultipleFailure: length==" + length + ", killPos="
+            + Arrays.toString(killPos) + ", dnIndex=" + Arrays.toString(dnIndex));
         setup(conf);
         runTest(length, killPos, dnIndex, false);
       } catch (Throwable e) {
@@ -334,6 +362,8 @@ public class TestDFSStripedOutputStreamWithFailure {
 
     long firstGS = -1;  // first GS of this block group which never proceeds blockRecovery
     long oldGS = -1; // the old GS before bumping
+    List<Long> gsList = new ArrayList<>();
+    final List<DatanodeInfo> killedDN = new ArrayList<>();
     int numKilled=0;
     for(; pos.get() < length; ) {
       final int i = pos.getAndIncrement();
@@ -353,7 +383,7 @@ public class TestDFSStripedOutputStreamWithFailure {
           waitTokenExpires(out);
         }
 
-        killDatanode(cluster, stripedOut, dnIndex[numKilled], pos);
+        killedDN.add(killDatanode(cluster, stripedOut, dnIndex[numKilled], pos));
         numKilled++;
       }
 
@@ -363,20 +393,18 @@ public class TestDFSStripedOutputStreamWithFailure {
         firstGS = getGenerationStamp(stripedOut);
         oldGS = firstGS;
       }
+      if (i > 0 && (i + 1) % BLOCK_GROUP_SIZE == 0) {
+        gsList.add(oldGS);
+      }
     }
+    gsList.add(oldGS);
     out.close();
     assertEquals(dnIndex.length, numKilled);
 
-    short expectedReported = StripedFileTestUtil.getRealTotalBlockNum(length);
-    for(int idx :dnIndex) {
-      if (length > idx * CELL_SIZE || idx >= NUM_DATA_BLOCKS) {
-        expectedReported--;
-      }
-    }
-    DFSTestUtil.waitReplication(dfs, p, expectedReported);
+    StripedFileTestUtil.waitBlockGroupsReported(dfs, fullPath, numKilled);
 
     cluster.triggerBlockReports();
-    StripedFileTestUtil.checkData(dfs, p, length, dnIndex, oldGS);
+    StripedFileTestUtil.checkData(dfs, p, length, killedDN, gsList);
   }
 
   static void write(FSDataOutputStream out, int i) throws IOException {
@@ -389,8 +417,7 @@ public class TestDFSStripedOutputStreamWithFailure {
 
   static long getGenerationStamp(DFSStripedOutputStream out)
       throws IOException {
-    DFSTestUtil.flushBuffer(out);
-    final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
+    final long gs = out.getBlock().getGenerationStamp();
     LOG.info("getGenerationStamp returns " + gs);
     return gs;
   }
@@ -421,12 +448,15 @@ public class TestDFSStripedOutputStreamWithFailure {
     }
   }
 
-  static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
-      final int dnIndex, final AtomicInteger pos) {
+  static DatanodeInfo killDatanode(MiniDFSCluster cluster,
+      DFSStripedOutputStream out, final int dnIndex, final AtomicInteger pos) {
     final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
     final DatanodeInfo datanode = getDatanodes(s);
     LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
-    cluster.stopDataNode(datanode.getXferAddr());
+    if (datanode != null) {
+      cluster.stopDataNode(datanode.getXferAddr());
+    }
+    return datanode;
   }
 
 

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java

@@ -66,6 +66,7 @@ public class TestReadStripedFileWithDecoding {
     Configuration conf = new HdfsConfiguration();
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
     cluster = new MiniDFSCluster.Builder(new HdfsConfiguration())
         .numDataNodes(numDNs).build();
     cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null);

+ 4 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
@@ -49,6 +50,9 @@ public class TestWriteReadStripedFile {
   private static Configuration conf = new HdfsConfiguration();
 
   static {
+    GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
     ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
         .getLogger().setLevel(Level.ALL);
   }