Browse Source

HDFS-16896 clear ignoredNodes list when we clear deadnode list on ref… (#5322) (#5444)

Cherry picked from: 162288bc0af944f116a4dd73e27f4676a204d9e9

Co-authored-by: Tom McCormick <tmccormi@linkedin.com>
Tom 2 years ago
parent
commit
703158c9c6

+ 29 - 5
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -224,7 +224,7 @@ public class DFSInputStream extends FSInputStream
   }
   }
 
 
   /**
   /**
-   * Grab the open-file info from namenode
+   * Grab the open-file info from namenode.
    * @param refreshLocatedBlocks whether to re-fetch locatedblocks
    * @param refreshLocatedBlocks whether to re-fetch locatedblocks
    */
    */
   void openInfo(boolean refreshLocatedBlocks) throws IOException {
   void openInfo(boolean refreshLocatedBlocks) throws IOException {
@@ -940,7 +940,8 @@ public class DFSInputStream extends FSInputStream
    * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
    * @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
    * false.
    * false.
    */
    */
-  private DNAddrPair chooseDataNode(LocatedBlock block,
+  @VisibleForTesting
+  DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
       Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
       throws IOException {
       throws IOException {
     while (true) {
     while (true) {
@@ -955,6 +956,14 @@ public class DFSInputStream extends FSInputStream
     }
     }
   }
   }
 
 
+  /**
+   * RefetchLocations should only be called when there are no active requests
+   * to datanodes. In the hedged read case this means futures should be empty.
+   * @param block The locatedBlock to get new datanode locations for.
+   * @param ignoredNodes A list of ignored nodes. This list can be null and can be cleared.
+   * @return the locatedBlock with updated datanode locations.
+   * @throws IOException
+   */
   private LocatedBlock refetchLocations(LocatedBlock block,
   private LocatedBlock refetchLocations(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
     String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
@@ -999,13 +1008,24 @@ public class DFSInputStream extends FSInputStream
       throw new InterruptedIOException(
       throw new InterruptedIOException(
           "Interrupted while choosing DataNode for read.");
           "Interrupted while choosing DataNode for read.");
     }
     }
-    clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
+    clearCachedNodeState(ignoredNodes);
     openInfo(true);
     openInfo(true);
     block = refreshLocatedBlock(block);
     block = refreshLocatedBlock(block);
     failures++;
     failures++;
     return block;
     return block;
   }
   }
 
 
+  /**
+   * Clear both the dead nodes and the ignored nodes
+   * @param ignoredNodes is cleared
+   */
+  private void clearCachedNodeState(Collection<DatanodeInfo> ignoredNodes) {
+    clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId]
+    if (ignoredNodes != null) {
+      ignoredNodes.clear();
+    }
+  }
+
   /**
   /**
    * Get the best node from which to stream the data.
    * Get the best node from which to stream the data.
    * @param block LocatedBlock, containing nodes in priority order.
    * @param block LocatedBlock, containing nodes in priority order.
@@ -1337,8 +1357,12 @@ public class DFSInputStream extends FSInputStream
         } catch (InterruptedException ie) {
         } catch (InterruptedException ie) {
           // Ignore and retry
           // Ignore and retry
         }
         }
-        if (refetch) {
-          refetchLocations(block, ignored);
+        // If refetch is true, then all nodes are in deadNodes or ignoredNodes.
+        // We should loop through all futures and remove them, so we do not
+        // have concurrent requests to the same node.
+        // Once all futures are cleared, we can clear the ignoredNodes and retry.
+        if (refetch && futures.isEmpty()) {
+          block = refetchLocations(block, ignored);
         }
         }
         // We got here if exception. Ignore this node on next go around IFF
         // We got here if exception. Ignore this node on next go around IFF
         // we found a chosenNode to hedge read against.
         // we found a chosenNode to hedge read against.

+ 23 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStreamBlockLocations.java

@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
@@ -35,11 +36,14 @@ import java.util.Map;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runner.RunWith;
@@ -200,6 +204,25 @@ public class TestDFSInputStreamBlockLocations {
     testWithRegistrationMethod(DFSInputStream::getAllBlocks);
     testWithRegistrationMethod(DFSInputStream::getAllBlocks);
   }
   }
 
 
+  /**
+   * If the ignoreList contains all datanodes, the ignoredList should be cleared to take advantage
+   * of retries built into chooseDataNode. This is needed for hedged reads
+   * @throws IOException
+   */
+  @Test
+  public void testClearIgnoreListChooseDataNode() throws IOException {
+    final String fileName = "/test_cache_locations";
+    filePath = createFile(fileName);
+
+    try (DFSInputStream fin = dfsClient.open(fileName)) {
+      LocatedBlocks existing = fin.locatedBlocks;
+      LocatedBlock block = existing.getLastLocatedBlock();
+      ArrayList<DatanodeInfo> ignoreList = new ArrayList<>(Arrays.asList(block.getLocations()));
+      Assert.assertNotNull(fin.chooseDataNode(block, ignoreList, true));
+      Assert.assertEquals(0, ignoreList.size());
+    }
+  }
+
   @FunctionalInterface
   @FunctionalInterface
   interface ThrowingConsumer {
   interface ThrowingConsumer {
     void accept(DFSInputStream fin) throws IOException;
     void accept(DFSInputStream fin) throws IOException;

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java

@@ -603,7 +603,9 @@ public class TestPread {
       input.read(0, buffer, 0, 1024);
       input.read(0, buffer, 0, 1024);
       Assert.fail("Reading the block should have thrown BlockMissingException");
       Assert.fail("Reading the block should have thrown BlockMissingException");
     } catch (BlockMissingException e) {
     } catch (BlockMissingException e) {
-      assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
+      // The result of 9 is due to 2 blocks by 4 iterations plus one because
+      // hedgedReadOpsLoopNumForTesting is incremented at start of the loop.
+      assertEquals(9, input.getHedgedReadOpsLoopNumForTesting());
       assertTrue(metrics.getHedgedReadOps() == 0);
       assertTrue(metrics.getHedgedReadOps() == 0);
     } finally {
     } finally {
       Mockito.reset(injector);
       Mockito.reset(injector);