소스 검색

HDFS-13121. NPE when request file descriptors when SC read. Contributed by Zsolt Venczel.

(cherry picked from commit 0247cb6318507afe06816e337a19f396afc53efa)
Wei-Chiu Chuang 6 년 전
부모
커밋
f5f4d0b7e7

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java

@@ -598,6 +598,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
       sock.recvFileInputStreams(fis, buf, 0, buf.length);
       ShortCircuitReplica replica = null;
       try {
+        if (fis[0] == null || fis[1] == null) {
+          throw new IOException("the datanode " + datanode + " failed to " +
+              "pass a file descriptor (might have reached open file limit).");
+        }
+
         ExtendedBlockId key =
             new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
         if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {

+ 89 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java

@@ -42,6 +42,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.PeerCache;
 import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
 import org.apache.hadoop.hdfs.DFSInputStream;
@@ -50,10 +54,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.ExtendedBlockId;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
@@ -66,9 +72,11 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RetriableException;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
@@ -819,4 +827,85 @@ public class TestShortCircuitCache {
         .fetch(Mockito.eq(extendedBlockId), Mockito.any());
     }
   }
+
+  @Test
+  public void testRequestFileDescriptorsWhenULimit() throws Exception {
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testRequestFileDescriptorsWhenULimit", sockDir);
+
+    final short replicas = 1;
+    final int fileSize = 3;
+    final String testFile = "/testfile";
+
+    try (MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(replicas).build()) {
+
+      cluster.waitActive();
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, new Path(testFile), fileSize, replicas, 0L);
+
+      LocatedBlock blk = new DFSClient(DFSUtilClient.getNNAddress(conf), conf)
+          .getLocatedBlocks(testFile, 0, fileSize).get(0);
+
+      ClientContext clientContext = Mockito.mock(ClientContext.class);
+      Mockito.when(clientContext.getPeerCache()).thenAnswer(
+          (Answer<PeerCache>) peerCacheCall -> {
+            PeerCache peerCache = new PeerCache(10, Long.MAX_VALUE);
+            DomainPeer peer = Mockito.spy(getDomainPeerToDn(conf));
+            peerCache.put(blk.getLocations()[0], peer);
+
+            Mockito.when(peer.getDomainSocket()).thenAnswer(
+                (Answer<DomainSocket>) domainSocketCall -> {
+                  DomainSocket domainSocket = Mockito.mock(DomainSocket.class);
+                  Mockito.when(domainSocket
+                      .recvFileInputStreams(
+                          Mockito.any(FileInputStream[].class),
+                          Mockito.any(byte[].class),
+                          Mockito.anyInt(),
+                          Mockito.anyInt())
+                  ).thenAnswer(
+                      // we are mocking the FileOutputStream array with nulls
+                      (Answer<Void>) recvFileInputStreamsCall -> null
+                  );
+                  return domainSocket;
+                }
+            );
+
+            return peerCache;
+          });
+
+      Mockito.when(clientContext.getShortCircuitCache()).thenAnswer(
+          (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
+            ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
+            Mockito.when(cache.allocShmSlot(
+                Mockito.any(DatanodeInfo.class),
+                Mockito.any(DomainPeer.class),
+                Mockito.any(MutableBoolean.class),
+                Mockito.any(ExtendedBlockId.class),
+                Mockito.anyString()))
+                .thenAnswer((Answer<Slot>) call -> null);
+
+            return cache;
+          }
+      );
+
+      DatanodeInfo[] nodes = blk.getLocations();
+
+      try {
+        Assert.assertNull(new BlockReaderFactory(new DfsClientConf(conf))
+            .setInetSocketAddress(NetUtils.createSocketAddr(nodes[0]
+                .getXferAddr()))
+            .setClientCacheContext(clientContext)
+            .setDatanodeInfo(blk.getLocations()[0])
+            .setBlock(blk.getBlock())
+            .setBlockToken(new Token())
+            .createShortCircuitReplicaInfo());
+      } catch (NullPointerException ex) {
+        Assert.fail("Should not throw NPE when the native library is unable " +
+            "to create new files!");
+      }
+    }
+  }
 }