فهرست منبع

HADOOP-1628. Add block CRC protocol unit tests. Contributed by Raghu Angadi.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@559623 13f79535-47bb-0310-9956-ffa450edef68
Owen O'Malley 18 سال پیش
والد
کامیت
cb3e358e6e

+ 2 - 0
CHANGES.txt

@@ -413,6 +413,8 @@ Branch 0.14 (unreleased changes)
 137. HADOOP-1587.  Fix TestSymLink to get required system properties.
      (Devaraj Das via omalley)
 
+138. HADOOP-1628.  Add block CRC protocol unit tests. (Raghu Angadi via omalley)
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

+ 1 - 1
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -1665,7 +1665,7 @@ class DFSClient implements FSConstants {
           numSuccessfulWrites++;
 
           //We should wait for response from the receiver.
-          int reply = blockReplyStream.readByte();
+          int reply = blockReplyStream.readShort();
           if ( reply == OP_STATUS_SUCCESS ||
               ( reply == OP_STATUS_ERROR_EXISTS &&
                   numSuccessfulWrites > 1 ) ) {

+ 8 - 4
src/java/org/apache/hadoop/dfs/DataNode.java

@@ -875,7 +875,7 @@ public class DataNode implements FSConstants, Runnable {
         long blockLen = 0;
         long lastOffset = 0;
         long lastLen = 0;
-        int status = -1;
+        short status = -1;
         boolean headerWritten = false;
         
         while ( true ) {
@@ -1006,9 +1006,9 @@ public class DataNode implements FSConstants, Runnable {
           if ( mirrorOut != null ) {
             //Wait for the remote reply
             mirrorOut.flush();
-            byte result = OP_STATUS_ERROR; 
+            short result = OP_STATUS_ERROR; 
             try {
-              result = mirrorIn.readByte();
+              result = mirrorIn.readShort();
             } catch ( IOException ignored ) {}
 
             msg += " and " +  (( result != OP_STATUS_SUCCESS ) ? 
@@ -1023,7 +1023,7 @@ public class DataNode implements FSConstants, Runnable {
             
         if ( status >= 0 ) {
           try {
-            reply.writeByte( status );
+            reply.writeShort( status );
             reply.flush();
           } catch ( IOException ignored ) {}
         }
@@ -1133,6 +1133,10 @@ public class DataNode implements FSConstants, Runnable {
 
       int bytesPerChecksum = checksum.getBytesPerChecksum();
       int checksumSize = checksum.getChecksumSize();
+      
+      if (length < 0) {
+        length = data.getLength(block);
+      }
 
       long endOffset = data.getLength( block );
       if ( startOffset < 0 || startOffset > endOffset ||

+ 0 - 2
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -26,8 +26,6 @@ import org.apache.hadoop.conf.Configuration;
 public interface FSConstants {
   public static int MIN_BLOCKS_FOR_WRITE = 5;
 
-  public static final long WRITE_COMPLETE = 0xcafae11a;
-
   //
   // IPC Opcodes 
   //

+ 235 - 0
src/test/org/apache/hadoop/dfs/TestDataTransferProtocol.java

@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.dfs.DFSClient.DFSDataInputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This tests data transfer protocol handling in the Datanode. It sends
+ * various forms of wrong data and verifies that Datanode handles it well.
+ */
+public class TestDataTransferProtocol extends TestCase {
+  
+  private static final Log LOG = LogFactory.getLog(
+                    "org.apache.hadoop.dfs.TestDataTransferProtocol");
+  
+  DatanodeID datanode;
+  InetSocketAddress dnAddr;
+  byte[] sendBuf = new byte[128];
+  byte[] recvBuf = new byte[128];
+  ByteBuffer byteBuf = ByteBuffer.wrap(sendBuf);
+  ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf);
+  
+  private void sendRecvData(String testDescription,
+                            boolean eofExpected) throws IOException {
+    /* Opens a socket to datanode
+     * sends the data in sendBuf.
+     * If there is data in expectedBuf, expects to receive the data
+     *     from datanode that matches expectedBuf.
+     * If there is an exception while recieving, throws it
+     *     only if exceptionExcepted is false.
+     */
+    
+    Socket sock = null;
+    try {
+      
+      if ( testDescription != null ) {
+        LOG.info("Testing : " + testDescription);
+      }
+      sock = new Socket();
+      sock.connect(dnAddr, FSConstants.READ_TIMEOUT);
+      sock.setSoTimeout(FSConstants.READ_TIMEOUT);
+      
+      OutputStream out = sock.getOutputStream();
+      // Should we excuse 
+      out.write(sendBuf, 0, byteBuf.position());
+      byte[] retBuf = new byte[recvByteBuf.position()];
+      
+      DataInputStream in = new DataInputStream(sock.getInputStream());
+      try {
+        in.readFully(retBuf);
+      } catch (EOFException eof) {
+        if ( eofExpected ) {
+          LOG.info("Got EOF as expected.");
+          return;
+        }
+        throw eof;
+      }
+      
+      if (eofExpected) {
+        throw new IOException("Did not recieve IOException when an exception " +
+                              "is expected while reading from " + 
+                              datanode.getName());
+      }
+      
+      for (int i=0; i<retBuf.length; i++) {
+        assertEquals("checking byte[" + i + "]", recvBuf[i], retBuf[i]);
+      }
+    } finally {
+      FileUtil.closeSocket(sock);
+    }
+  }
+  
+  void createFile(FileSystem fs, Path path, int fileLen) throws IOException {
+    byte [] arr = new byte[fileLen];
+    FSDataOutputStream out = fs.create(path);
+    out.write(arr);
+    out.close();
+  }
+  
+  void readFile(FileSystem fs, Path path, int fileLen) throws IOException {
+    byte [] arr = new byte[fileLen];
+    FSDataInputStream in = fs.open(path);
+    in.readFully(arr);
+  }
+  
+  Block getFirstBlock(FileSystem fs, Path path) throws IOException {
+    DFSDataInputStream in = 
+      (DFSDataInputStream) ((DistributedFileSystem)fs).open(path);
+    in.readByte();
+    return in.getCurrentBlock();
+  }
+  
+  public void testDataTransferProtocol() throws IOException {
+    Random random = new Random();
+    int oneMil = 1024*1024;
+    Path file = new Path("dataprotocol.dat");
+    int numDataNodes = 1;
+    
+    Configuration conf = new Configuration();
+    conf.setInt("dfs.replication", numDataNodes); 
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    cluster.waitActive();
+    DFSClient dfsClient = new DFSClient(
+                 new InetSocketAddress("localhost", cluster.getNameNodePort()),
+                 conf);                
+    datanode = dfsClient.datanodeReport()[0];
+    dnAddr = DataNode.createSocketAddr(datanode.getName());
+    FileSystem fileSys = cluster.getFileSystem();
+    
+    int fileLen = Math.min(conf.getInt("dfs.block.size", 4096), 4096);
+    
+    createFile(fileSys, file, fileLen);
+
+    // get the first blockid for the file
+    Block firstBlock = getFirstBlock(fileSys, file);
+    long newBlockId = firstBlock.getBlockId() + 1;
+
+    recvByteBuf.position(1);
+    byteBuf.position(0);
+    
+    int versionPos = 0;
+    byteBuf.putShort((short)(FSConstants.DATA_TRANFER_VERSION-1));
+    sendRecvData("Wrong Version", true);
+    // correct the version
+    byteBuf.putShort(versionPos, (short)FSConstants.DATA_TRANFER_VERSION);
+    
+    int opPos = byteBuf.position();
+    byteBuf.put((byte)(FSConstants.OP_WRITE_BLOCK-1));
+    sendRecvData("Wrong Op Code", true);
+    
+    /* Test OP_WRITE_BLOCK */
+    
+    byteBuf.position(opPos);
+    // Initially write correct values
+    byteBuf.put((byte)FSConstants.OP_WRITE_BLOCK);
+    int blockPos = byteBuf.position();
+    byteBuf.putLong(newBlockId);
+    int targetPos = byteBuf.position();
+    byteBuf.putInt(0);
+    int checksumPos = byteBuf.position();
+    byteBuf.put((byte)DataChecksum.CHECKSUM_CRC32);
+    
+    byteBuf.putInt(-1-random.nextInt(oneMil));
+    sendRecvData("wrong bytesPerChecksum while writing", true);
+    byteBuf.putInt(checksumPos+1, 512);
+    
+    byteBuf.putInt(targetPos, -1-random.nextInt(oneMil));
+    sendRecvData("bad targets len while writing", true);
+    byteBuf.putInt(targetPos, 0);
+    
+    byteBuf.putLong(blockPos, ++newBlockId);
+    int dataChunkPos = byteBuf.position();
+    byteBuf.putInt(-1-random.nextInt(oneMil));
+    recvByteBuf.position(0);
+    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR);//err ret expected.
+    sendRecvData("negative DATA_CHUNK len while writing", false);
+    byteBuf.putInt(dataChunkPos, 0);
+    
+    byteBuf.putInt(0); // zero checksum
+    byteBuf.putLong(blockPos, ++newBlockId);    
+    //ok finally write a block with 0 len
+    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_SUCCESS);
+    sendRecvData("Writing a zero len block", false);
+    
+    
+    /* Test OP_READ_BLOCK */
+    
+    byteBuf.position(opPos);
+    byteBuf.put((byte)FSConstants.OP_READ_BLOCK);
+    blockPos = byteBuf.position();
+    newBlockId = firstBlock.getBlockId()-1;
+    byteBuf.putLong(newBlockId);
+    int startOffsetPos = byteBuf.position();
+    byteBuf.putLong(0L);
+    int lenPos = byteBuf.position();
+    byteBuf.putLong(fileLen);
+    /* We should change DataNode to return ERROR_INVALID instead of closing 
+     * the connection.
+     */
+    sendRecvData("Wrong block ID for read", true); 
+    byteBuf.putLong(blockPos, firstBlock.getBlockId());
+    
+    recvByteBuf.position(0);
+    recvByteBuf.putShort((short)FSConstants.OP_STATUS_ERROR_INVALID);
+    byteBuf.putLong(startOffsetPos, -1-random.nextInt(oneMil));
+    sendRecvData("Negative start-offset for read", false);
+    
+    byteBuf.putLong(startOffsetPos, fileLen);
+    sendRecvData("Wrong start-offset for read", false);
+    byteBuf.putLong(startOffsetPos, 0);
+    
+    // negative length is ok. Datanode assumes we want to read the whole block.
+    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_SUCCESS);    
+    byteBuf.putLong(lenPos, -1-random.nextInt(oneMil));
+    sendRecvData("Negative length for read", false);
+    
+    recvByteBuf.putShort(0, (short)FSConstants.OP_STATUS_ERROR_INVALID);
+    byteBuf.putLong(lenPos, fileLen+1);
+    sendRecvData("Wrong length for read", false);
+    byteBuf.putLong(lenPos, fileLen);
+    
+    //At the end of all this, read the file to make sure that succeeds finally.
+    readFile(fileSys, file, fileLen);
+  }
+}
+