Browse Source

HDFS-11608. HDFS write crashed with block size greater than 2 GB. Contributed by Xiaobing Zhou.

(cherry picked from commit 0eacd4c13be9bad0fbed9421a6539c64bbda4df1)
Xiaoyu Yao 8 years ago
parent
commit
0391c92022

+ 38 - 3
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
@@ -120,6 +121,7 @@ public class DFSOutputStream extends FSOutputSummer
   private final EnumSet<AddBlockFlag> addBlockFlags;
   protected final AtomicReference<CachingStrategy> cachingStrategy;
   private FileEncryptionInfo fileEncryptionInfo;
+  private int writePacketSize;
 
   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
   protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@@ -199,7 +201,9 @@ public class DFSOutputStream extends FSOutputSummer
       DFSClient.LOG.debug("Set non-null progress callback on DFSOutputStream "
           +"{}", src);
     }
-    
+
+    initWritePacketSize();
+
     this.bytesPerChecksum = checksum.getBytesPerChecksum();
     if (bytesPerChecksum <= 0) {
       throw new HadoopIllegalArgumentException(
@@ -214,6 +218,21 @@ public class DFSOutputStream extends FSOutputSummer
     this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
   }
 
+  /**
+   * Ensures the configured writePacketSize never exceeds
+   * PacketReceiver.MAX_PACKET_SIZE.
+   */
+  private void initWritePacketSize() {
+    writePacketSize = dfsClient.getConf().getWritePacketSize();
+    if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) {
+      LOG.warn(
+          "Configured write packet exceeds {} bytes as max,"
+              + " using {} bytes.",
+          PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE);
+      writePacketSize = PacketReceiver.MAX_PACKET_SIZE;
+    }
+  }
+
   /** Construct a new output stream for creating a file. */
   protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
       EnumSet<CreateFlag> flag, Progressable progress,
@@ -450,12 +469,28 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     if (!getStreamer().getAppendChunk()) {
-      int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
-          dfsClient.getConf().getWritePacketSize());
+      final int psize = (int) Math
+          .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize);
       computePacketChunkSize(psize, bytesPerChecksum);
     }
   }
 
+  /**
+   * Used in test only.
+   */
+  @VisibleForTesting
+  void setAppendChunk(final boolean appendChunk) {
+    getStreamer().setAppendChunk(appendChunk);
+  }
+
+  /**
+   * Used in test only.
+   */
+  @VisibleForTesting
+  void setBytesCurBlock(final long bytesCurBlock) {
+    getStreamer().setBytesCurBlock(bytesCurBlock);
+  }
+
   /**
    * if encountering a block boundary, send an empty packet to
    * indicate the end of block and reset bytesCurBlock.

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java

@@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable {
    * The max size of any single packet. This prevents OOMEs when
    * invalid data is sent.
    */
-  private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
+  public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
 
   static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);
 

+ 126 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java

@@ -18,8 +18,10 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -41,10 +43,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
 import org.apache.htrace.core.SpanId;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -64,6 +69,9 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+
 public class TestDFSOutputStream {
   static MiniDFSCluster cluster;
 
@@ -133,6 +141,124 @@ public class TestDFSOutputStream {
     Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
   }
 
+  /**
+   * This tests preventing overflows of package size and bodySize.
+   * <p>
+   * See also https://issues.apache.org/jira/browse/HDFS-11608.
+   * </p>
+   * @throws IOException
+   * @throws SecurityException
+   * @throws NoSuchFieldException
+   * @throws InvocationTargetException
+   * @throws IllegalArgumentException
+   * @throws IllegalAccessException
+   * @throws NoSuchMethodException
+   */
+  @Test(timeout=60000)
+  public void testPreventOverflow() throws IOException, NoSuchFieldException,
+      SecurityException, IllegalAccessException, IllegalArgumentException,
+      InvocationTargetException, NoSuchMethodException {
+
+    final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+    int configuredWritePacketSize = defaultWritePacketSize;
+    int finalWritePacketSize = defaultWritePacketSize;
+
+    /* test default WritePacketSize, e.g. 64*1024 */
+    runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
+
+    /* test large WritePacketSize, e.g. 1G */
+    configuredWritePacketSize = 1000 * 1024 * 1024;
+    finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
+    runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
+  }
+
+  /**
+   * @configuredWritePacketSize the configured WritePacketSize.
+   * @finalWritePacketSize the final WritePacketSize picked by
+   *                       {@link DFSOutputStream#adjustChunkBoundary}
+   */
+  private void runAdjustChunkBoundary(
+      final int configuredWritePacketSize,
+      final int finalWritePacketSize) throws IOException, NoSuchFieldException,
+      SecurityException, IllegalAccessException, IllegalArgumentException,
+      InvocationTargetException, NoSuchMethodException {
+
+    final boolean appendChunk = false;
+    final long blockSize = 3221225500L;
+    final long bytesCurBlock = 1073741824L;
+    final int bytesPerChecksum = 512;
+    final int checksumSize = 4;
+    final int chunkSize = bytesPerChecksum + checksumSize;
+    final int packateMaxHeaderLength = 33;
+
+    MiniDFSCluster dfsCluster = null;
+    final File baseDir = new File(PathUtils.getTestDir(getClass()),
+        GenericTestUtils.getMethodName());
+
+    try {
+      final Configuration dfsConf = new Configuration();
+      dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+          baseDir.getAbsolutePath());
+      dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+          configuredWritePacketSize);
+      dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
+      dfsCluster.waitActive();
+
+      final FSDataOutputStream os = dfsCluster.getFileSystem()
+          .create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow"));
+      final DFSOutputStream dos = (DFSOutputStream) Whitebox
+          .getInternalState(os, "wrappedStream");
+
+      /* set appendChunk */
+      final Method setAppendChunkMethod = dos.getClass()
+          .getDeclaredMethod("setAppendChunk", boolean.class);
+      setAppendChunkMethod.setAccessible(true);
+      setAppendChunkMethod.invoke(dos, appendChunk);
+
+      /* set bytesCurBlock */
+      final Method setBytesCurBlockMethod = dos.getClass()
+          .getDeclaredMethod("setBytesCurBlock", long.class);
+      setBytesCurBlockMethod.setAccessible(true);
+      setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
+
+      /* set blockSize */
+      final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
+      blockSizeField.setAccessible(true);
+      blockSizeField.setLong(dos, blockSize);
+
+      /* call adjustChunkBoundary */
+      final Method method = dos.getClass()
+          .getDeclaredMethod("adjustChunkBoundary");
+      method.setAccessible(true);
+      method.invoke(dos);
+
+      /* get and verify writePacketSize */
+      final Field writePacketSizeField = dos.getClass()
+          .getDeclaredField("writePacketSize");
+      writePacketSizeField.setAccessible(true);
+      Assert.assertEquals(writePacketSizeField.getInt(dos),
+          finalWritePacketSize);
+
+      /* get and verify chunksPerPacket */
+      final Field chunksPerPacketField = dos.getClass()
+          .getDeclaredField("chunksPerPacket");
+      chunksPerPacketField.setAccessible(true);
+      Assert.assertEquals(chunksPerPacketField.getInt(dos),
+          (finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
+
+      /* get and verify packetSize */
+      final Field packetSizeField = dos.getClass()
+          .getDeclaredField("packetSize");
+      packetSizeField.setAccessible(true);
+      Assert.assertEquals(packetSizeField.getInt(dos),
+          chunksPerPacketField.getInt(dos) * chunkSize);
+    } finally {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+      }
+    }
+  }
+
   @Test
   public void testCongestionBackoff() throws IOException {
     DfsClientConf dfsClientConf = mock(DfsClientConf.class);