Browse Source

Merging change r1036767 from trunk to federation

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1052@1089234 13f79535-47bb-0310-9956-ffa450edef68
Suresh Srinivas 14 years ago
parent
commit
eeea995e70

+ 3 - 0
CHANGES.txt

@@ -814,6 +814,9 @@ Release 0.22.0 - Unreleased
 
     HDFS-981. test-contrib fails due to test-cactus failure (cos)
 
+    HDFS-1001. DataXceiver and BlockReader disagree on when to send/recv
+    CHECKSUM_OK. (bc Wong via eli)
+
 Release 0.21.1 - Unreleased
 
     HDFS-1411. Correct backup node startup command in hdfs user guide.

+ 1 - 1
src/java/org/apache/hadoop/hdfs/BlockReader.java

@@ -433,7 +433,7 @@ public class BlockReader extends FSInputChecker {
     return readFully(this, buf, offset, len);
   }
   
-  /* When the reader reaches end of a block and there are no checksum
+  /* When the reader reaches end of the read and there are no checksum
    * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
    * checksum was verified and there was no error.
    */ 

+ 3 - 1
src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

@@ -1850,10 +1850,12 @@ public class DataNode extends Configured
     "LastPacketInBlock" set to true or with a zero length. If there is 
     no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
     
-    Client optional response at the end of data transmission :
+    Client optional response at the end of data transmission of any length:
       +------------------------------+
       | 2 byte OP_STATUS_CHECKSUM_OK |
       +------------------------------+
+    The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
+    client connection if it is absent.
     
     PACKET : Contains a packet header, checksum and data. Amount of data
     ======== carried is set by BUFFER_SIZE.

+ 151 - 0
src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java

@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.io.DataOutputStream;
+import java.util.Random;
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.junit.Assert.*;
+
+/**
+ * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
+ */
+public class BlockReaderTestUtil {
+
+  private HdfsConfiguration conf = null;
+  private MiniDFSCluster cluster = null;
+
+  /**
+   * Setup the cluster
+   */
+  public BlockReaderTestUtil(int replicationFactor) throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setInt("dfs.replication", replicationFactor);
+    cluster = new MiniDFSCluster.Builder(conf).format(true).build();
+    cluster.waitActive();
+  }
+
+  /**
+   * Shutdown cluster
+   */
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  public HdfsConfiguration getConf() {
+    return conf;
+  }
+
+  /**
+   * Create a file of the given size filled with random data.
+   * @return  List of Blocks of the new file.
+   */
+  public List<LocatedBlock> writeFile(Path filepath, int sizeKB)
+      throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+
+    // Write a file with the specified amount of data
+    DataOutputStream os = fs.create(filepath);
+    byte data[] = new byte[1024];
+    new Random().nextBytes(data);
+    for (int i = 0; i < sizeKB; i++) {
+      os.write(data);
+    }
+    os.close();
+
+    // Return the blocks we just wrote
+    DFSClient dfsclient = new DFSClient(
+      new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+    return dfsclient.getNamenode().getBlockLocations(
+      filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
+  }
+
+
+  /**
+   * Exercise the BlockReader and read length bytes.
+   *
+   * It does not verify the bytes read.
+   */
+  public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof)
+      throws IOException {
+    byte buf[] = new byte[1024];
+    int nRead = 0;
+    while (nRead < length) {
+      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
+      int n = reader.read(buf, 0, buf.length);
+      assertTrue(n > 0);
+      nRead += n;
+    }
+
+    if (expectEof) {
+      DFSClient.LOG.info("Done reading, expect EOF for next read.");
+      assertEquals(-1, reader.read(buf, 0, buf.length));
+    }
+  }
+
+  /**
+   * Get a BlockReader for the given block.
+   */
+  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+      throws IOException {
+    InetSocketAddress targetAddr = null;
+    Socket sock = null;
+    ExtendedBlock block = testBlock.getBlock();
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+    sock = new Socket();
+    sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+    sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+    return BlockReader.newBlockReader(
+      sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
+      testBlock.getBlockToken(), 
+      offset, lenToRead,
+      conf.getInt("io.file.buffer.size", 4096));
+  }
+
+  /**
+   * Get a DataNode that serves our testBlock.
+   */
+  public DataNode getDataNode(LocatedBlock testBlock) {
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    int ipcport = nodes[0].ipcPort;
+    return cluster.getDataNode(ipcport);
+  }
+
+}

+ 15 - 87
src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java

@@ -18,21 +18,11 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.io.DataOutputStream;
-import java.util.Random;
 import java.util.List;
-import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.net.NetUtils;
 
 import org.junit.Test;
 import org.junit.AfterClass;
@@ -44,57 +34,18 @@ import static org.mockito.Mockito.never;
 import static org.junit.Assert.*;
 
 public class TestClientBlockVerification {
-  static MiniDFSCluster cluster = null;
-  static Configuration conf = null;
-  static FileSystem fs = null;
+
+  static BlockReaderTestUtil util = null;
   static final Path TEST_FILE = new Path("/test.file");
   static final int FILE_SIZE_K = 256;
   static LocatedBlock testBlock = null;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
-    conf = new HdfsConfiguration();
-    int numDataNodes = 1;
-    conf.setInt("dfs.replication", numDataNodes);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-
-    // Write a file with 256K of data
-    DataOutputStream os = fs.create(TEST_FILE);
-    byte data[] = new byte[1024];
-    new Random().nextBytes(data);
-    for (int i = 0; i < FILE_SIZE_K; i++) {
-      os.write(data);
-    }
-    os.close();
-
-    // Locate the block we just wrote
-    DFSClient dfsclient = new DFSClient(
-      new InetSocketAddress("localhost",
-                            cluster.getNameNodePort()), conf);
-    List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
-      TEST_FILE.toString(), 0, FILE_SIZE_K * 1024).getLocatedBlocks();
-    testBlock = locatedBlocks.get(0); // first block
-  }
-
-  private BlockReader getBlockReader(
-    int offset, int lenToRead) throws IOException {
-    InetSocketAddress targetAddr = null;
-    Socket s = null;
-    ExtendedBlock block = testBlock.getBlock();
-    DatanodeInfo[] nodes = testBlock.getLocations();
-    targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
-    s = new Socket();
-    s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
-
-    String file = BlockReader.getFileName(targetAddr,
-        "test-blockpoolid",
-        block.getBlockId());
-    return BlockReader.newBlockReader(s, file, block,
-        testBlock.getBlockToken(), offset, lenToRead, conf.getInt(
-            "io.file.buffer.size", 4096));
+    final int REPLICATION_FACTOR = 1;
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+    testBlock = blkList.get(0);     // Use the first block to test
   }
 
   /**
@@ -102,8 +53,8 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testBlockVerification() throws Exception {
-    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
-    slurpReader(reader, FILE_SIZE_K * 1024, true);
+    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).checksumOk(reader.dnSock);
     reader.close();
   }
@@ -113,8 +64,8 @@ public class TestClientBlockVerification {
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
-    slurpReader(reader, FILE_SIZE_K / 2 * 1024, false);
+    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
     // We asked the blockreader for the whole file, and only read
     // half of it, so no checksumOk
@@ -130,9 +81,9 @@ public class TestClientBlockVerification {
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024 / 2));
+    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
-    slurpReader(reader, FILE_SIZE_K * 1024 / 2, true);
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).checksumOk(reader.dnSock);
     reader.close();
   }
@@ -149,8 +100,8 @@ public class TestClientBlockVerification {
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        BlockReader reader = spy(getBlockReader(startOffset, length));
-        slurpReader(reader, length, true);
+        BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+        util.readAndCheckEOS(reader, length, true);
         verify(reader).checksumOk(reader.dnSock);
         reader.close();
       }
@@ -158,32 +109,9 @@ public class TestClientBlockVerification {
   }
 
 
-  /**
-   * Read the given length from the given block reader.
-   *
-   * @param expectEOF if true, will expect an eof response when done
-   */
-  private void slurpReader(BlockReader reader, int length, boolean expectEof)
-    throws IOException {
-    byte buf[] = new byte[1024];
-    int nRead = 0;
-    while (nRead < length) {
-      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
-      int n = reader.read(buf, 0, buf.length);
-      assertTrue(n > 0);
-      nRead += n;
-    }
-    DFSClient.LOG.info("Done reading, expect EOF for next read.");
-    if (expectEof) {
-      assertEquals(-1, reader.read(buf, 0, buf.length));
-    }
-  }
-
   @AfterClass
   public static void teardownCluster() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
+    util.shutdown();
   }
 
 }