Browse Source

HDFS-17455. Fix Client throw IndexOutOfBoundsException in DFSInputStream#fetchBlockAt (#6710). Contributed by Haiyang Hu.

Reviewed-by: ZanderXu <zanderxu@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
huhaiyang 1 year ago
parent
commit
81b05977f2

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 
 /**
  * Used for injecting faults in DFSClient and DFSOutputStream tests.
@@ -69,4 +70,5 @@ public class DFSClientFaultInjector {
 
   public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
 
+  public void failCreateBlockReader() throws InvalidBlockTokenException {}
 }

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java

@@ -519,6 +519,14 @@ public class DFSInputStream extends FSInputStream
         // Update the LastLocatedBlock, if offset is for last block.
         if (offset >= locatedBlocks.getFileLength()) {
           setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks));
+          // After updating the locatedBlock, the block to which the offset belongs
+          // should be researched like {@link DFSInputStream#getBlockAt(long)}.
+          if (offset >= locatedBlocks.getFileLength()) {
+            return locatedBlocks.getLastLocatedBlock();
+          } else {
+            targetBlockIdx = locatedBlocks.findBlock(offset);
+            assert targetBlockIdx >= 0 && targetBlockIdx < locatedBlocks.locatedBlockCount();
+          }
         } else {
           locatedBlocks.insertRange(targetBlockIdx,
               newBlocks.getLocatedBlocks());
@@ -641,6 +649,7 @@ public class DFSInputStream extends FSInputStream
       targetBlock = retval.block;
 
       try {
+        DFSClientFaultInjector.get().failCreateBlockReader();
         blockReader = getBlockReader(targetBlock, offsetIntoBlock,
             targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
             storageType, chosenNode);

+ 72 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java

@@ -31,6 +31,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -41,14 +43,21 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
 
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
 import org.junit.Assume;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestDFSInputStream {
   private void testSkipInner(MiniDFSCluster cluster) throws IOException {
@@ -287,4 +296,67 @@ public class TestDFSInputStream {
       cluster.shutdown();
     }
   }
+
+  @Test
+  public void testCreateBlockReaderWhenInvalidBlockTokenException() throws
+      IOException, InterruptedException, TimeoutException {
+    GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 64 * 1024);
+    conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 516);
+    DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get();
+    FSDataOutputStream out = null;
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create file which only contains one UC block.
+      String file = "/testfile";
+      Path path = new Path(file);
+      out = fs.create(path, (short) 3);
+      int bufferLen = 5120;
+      byte[] toWrite = new byte[bufferLen];
+      Random rb = new Random(0);
+      rb.nextBytes(toWrite);
+      out.write(toWrite, 0, bufferLen);
+
+      // Wait for the block length of the file to be 1.
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return fs.getFileBlockLocations(path, 0, bufferLen).length == 1;
+        } catch (IOException e) {
+          return false;
+        }
+      }, 100, 10000);
+
+      // Set up the InjectionHandler.
+      DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
+      DFSClientFaultInjector injector = DFSClientFaultInjector.get();
+      final AtomicInteger count = new AtomicInteger(0);
+      Mockito.doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          // Mock access token was invalid when connecting to first datanode
+          // throw InvalidBlockTokenException.
+          if (count.getAndIncrement() == 0) {
+            throw new InvalidBlockTokenException("Mock InvalidBlockTokenException");
+          }
+          return null;
+        }
+      }).when(injector).failCreateBlockReader();
+
+      try (DFSInputStream in = new DFSInputStream(fs.getClient(), file,
+          false, null)) {
+        int bufLen = 1024;
+        byte[] buf = new byte[bufLen];
+        // Seek the offset to 1024 and which should be in the range (0, fileSize).
+        in.seek(1024);
+        int read = in.read(buf, 0, bufLen);
+        assertEquals(1024, read);
+      }
+    } finally {
+      DFSClientFaultInjector.set(oldFaultInjector);
+      IOUtils.closeStream(out);
+    }
+  }
 }