Browse Source

HDFS-6758: Merging r1619275 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619276 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 10 years ago
parent
commit
f7e3b6e761

+ 4 - 1
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -154,7 +154,10 @@ Release 2.6.0 - UNRELEASED
     hadoop.security.saslproperties.resolver.class. (Benoy Antony via cnauroth)
 
     HDFS-6878. Change MiniDFSCluster to support StorageType configuration
-    for individual directories (Arpit Agarwal)
+    for individual directories. (Arpit Agarwal)
+
+    HDFS-6758. block writer should pass the expected block size to
+    DataXceiverServer. (Arpit Agarwal)
 
   OPTIMIZATIONS
 

+ 7 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

@@ -1339,8 +1339,14 @@ public class DFSOutputStream extends FSOutputSummer
           //
   
           BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
+
+          // We cannot change the block length in 'block' as it counts the number
+          // of bytes ack'ed.
+          ExtendedBlock blockCopy = new ExtendedBlock(block);
+          blockCopy.setNumBytes(blockSize);
+
           // send the request
-          new Sender(out).writeBlock(block, nodeStorageTypes[0], accessToken,
+          new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
               cachingStrategy.get());

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

@@ -576,7 +576,9 @@ class DataXceiver extends Receiver implements Runnable {
     // forward the original version of the block to downstream mirrors, so
     // make a copy here.
     final ExtendedBlock originalBlock = new ExtendedBlock(block);
-    block.setNumBytes(dataXceiverServer.estimateBlockSize);
+    if (block.getNumBytes() == 0) {
+      block.setNumBytes(dataXceiverServer.estimateBlockSize);
+    }
     LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
         + localAddress);
 

+ 2 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java

@@ -100,11 +100,8 @@ class DataXceiverServer implements Runnable {
   
   /**
    * We need an estimate for block size to check if the disk partition has
-   * enough space. For now we set it to be the default block size set
-   * in the server side configuration, which is not ideal because the
-   * default block size should be a client-size configuration. 
-   * A better solution is to include in the header the estimated block size,
-   * i.e. either the actual block size or the default block size.
+   * enough space. Newer clients pass the expected block size to the DataNode.
+   * For older clients we just use the server-side default block size.
    */
   final long estimateBlockSize;
   

+ 106 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.*;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.*;
+
+
+/**
+ * Test to verify that the DFSClient passes the expected block length to
+ * the DataNode via DataTransferProtocol.
+ */
+public class TestWriteBlockGetsBlockLengthHint {
+  static final long DEFAULT_BLOCK_LENGTH = 1024;
+  static final long EXPECTED_BLOCK_LENGTH = DEFAULT_BLOCK_LENGTH * 2;
+
+  @Test
+  public void blockLengthHintIsPropagated() throws IOException {
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final Path path = new Path("/" + METHOD_NAME + ".dat");
+
+    Configuration conf = new HdfsConfiguration();
+    FsDatasetChecker.setFactory(conf);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_LENGTH);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+
+    try {
+      cluster.waitActive();
+
+      // FsDatasetChecker#createRbw asserts during block creation if the test
+      // fails.
+      DFSTestUtil.createFile(
+          cluster.getFileSystem(),
+          path,
+          4096,  // Buffer size.
+          EXPECTED_BLOCK_LENGTH,
+          EXPECTED_BLOCK_LENGTH,
+          (short) 1,
+          0x1BAD5EED);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  static class FsDatasetChecker extends SimulatedFSDataset {
+    static class Factory extends FsDatasetSpi.Factory<SimulatedFSDataset> {
+      @Override
+      public SimulatedFSDataset newInstance(DataNode datanode,
+          DataStorage storage, Configuration conf) throws IOException {
+        return new FsDatasetChecker(storage, conf);
+      }
+
+      @Override
+      public boolean isSimulated() {
+        return true;
+      }
+    }
+
+    public static void setFactory(Configuration conf) {
+      conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+               Factory.class.getName());
+    }
+
+    public FsDatasetChecker(DataStorage storage, Configuration conf) {
+      super(storage, conf);
+    }
+
+    /**
+     * Override createRbw to verify that the block length that is passed
+     * is correct. This requires both DFSOutputStream and BlockReceiver to
+     * correctly propagate the hint to FsDatasetSpi.
+     */
+    @Override
+    public synchronized ReplicaInPipelineInterface createRbw(
+        StorageType storageType, ExtendedBlock b) throws IOException {
+      assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
+      return super.createRbw(storageType, b);
+    }
+  }
+}