|
@@ -34,9 +34,10 @@ import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
|
-import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
@@ -75,8 +76,8 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
LOG.info("Testing : " + testDescription);
|
|
|
}
|
|
|
sock = new Socket();
|
|
|
- sock.connect(dnAddr, FSConstants.READ_TIMEOUT);
|
|
|
- sock.setSoTimeout(FSConstants.READ_TIMEOUT);
|
|
|
+ sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
|
|
|
+ sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
|
|
|
|
|
|
OutputStream out = sock.getOutputStream();
|
|
|
// Should we excuse
|
|
@@ -156,20 +157,20 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
sendBuf.reset();
|
|
|
|
|
|
// bad version
|
|
|
- recvOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
|
|
|
- sendOut.writeShort((short)(FSConstants.DATA_TRANSFER_VERSION-1));
|
|
|
+ recvOut.writeShort((short)(DataTransferProtocol.DATA_TRANSFER_VERSION-1));
|
|
|
+ sendOut.writeShort((short)(DataTransferProtocol.DATA_TRANSFER_VERSION-1));
|
|
|
sendRecvData("Wrong Version", true);
|
|
|
|
|
|
// bad ops
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)(FSConstants.OP_WRITE_BLOCK-1));
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)(DataTransferProtocol.OP_WRITE_BLOCK-1));
|
|
|
sendRecvData("Wrong Op Code", true);
|
|
|
|
|
|
/* Test OP_WRITE_BLOCK */
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
|
|
|
sendOut.writeLong(newBlockId); // block id
|
|
|
sendOut.writeLong(0); // generation stamp
|
|
|
sendOut.writeInt(0); // targets in pipeline
|
|
@@ -182,13 +183,13 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
// bad bytes per checksum
|
|
|
sendOut.writeInt(-1-random.nextInt(oneMil));
|
|
|
recvBuf.reset();
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
sendRecvData("wrong bytesPerChecksum while writing", true);
|
|
|
|
|
|
sendBuf.reset();
|
|
|
recvBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
|
|
|
sendOut.writeLong(newBlockId);
|
|
|
sendOut.writeLong(0); // generation stamp
|
|
|
sendOut.writeInt(0); // targets in pipeline
|
|
@@ -198,13 +199,13 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
|
|
|
// bad number of targets
|
|
|
sendOut.writeInt(-1-random.nextInt(oneMil));
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
sendRecvData("bad targets len while writing block " + newBlockId, true);
|
|
|
|
|
|
sendBuf.reset();
|
|
|
recvBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
|
|
|
sendOut.writeLong(++newBlockId);
|
|
|
sendOut.writeLong(0); // generation stamp
|
|
|
sendOut.writeInt(0); // targets in pipeline
|
|
@@ -223,15 +224,15 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
sendOut.writeInt(-1-random.nextInt(oneMil));
|
|
|
Text.writeString(recvOut, ""); // first bad node
|
|
|
recvOut.writeLong(100); // sequencenumber
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
|
|
|
true);
|
|
|
|
|
|
// test for writing a valid zero size block
|
|
|
sendBuf.reset();
|
|
|
recvBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_WRITE_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
|
|
|
sendOut.writeLong(++newBlockId);
|
|
|
sendOut.writeLong(0); // generation stamp
|
|
|
sendOut.writeInt(0); // targets in pipeline
|
|
@@ -251,7 +252,7 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
//ok finally write a block with 0 len
|
|
|
Text.writeString(recvOut, ""); // first bad node
|
|
|
recvOut.writeLong(100); // sequencenumber
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
|
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
|
|
|
|
|
|
/* Test OP_READ_BLOCK */
|
|
@@ -259,21 +260,21 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
// bad block id
|
|
|
sendBuf.reset();
|
|
|
recvBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
|
|
|
newBlockId = firstBlock.getBlockId()-1;
|
|
|
sendOut.writeLong(newBlockId);
|
|
|
sendOut.writeLong(firstBlock.getGenerationStamp());
|
|
|
sendOut.writeLong(0L);
|
|
|
sendOut.writeLong(fileLen);
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
Text.writeString(sendOut, "cl");
|
|
|
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
|
|
|
|
|
|
// negative block start offset
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
|
|
|
sendOut.writeLong(firstBlock.getBlockId());
|
|
|
sendOut.writeLong(firstBlock.getGenerationStamp());
|
|
|
sendOut.writeLong(-1L);
|
|
@@ -284,8 +285,8 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
|
|
|
// bad block start offset
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
|
|
|
sendOut.writeLong(firstBlock.getBlockId());
|
|
|
sendOut.writeLong(firstBlock.getGenerationStamp());
|
|
|
sendOut.writeLong(fileLen);
|
|
@@ -296,10 +297,10 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
|
|
|
// negative length is ok. Datanode assumes we want to read the whole block.
|
|
|
recvBuf.reset();
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_SUCCESS);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
|
|
|
sendOut.writeLong(firstBlock.getBlockId());
|
|
|
sendOut.writeLong(firstBlock.getGenerationStamp());
|
|
|
sendOut.writeLong(0);
|
|
@@ -310,10 +311,10 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
|
|
|
// length is more than size of block.
|
|
|
recvBuf.reset();
|
|
|
- recvOut.writeShort((short)FSConstants.OP_STATUS_ERROR);
|
|
|
+ recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
|
|
|
sendOut.writeLong(firstBlock.getBlockId());
|
|
|
sendOut.writeLong(firstBlock.getGenerationStamp());
|
|
|
sendOut.writeLong(0);
|
|
@@ -324,8 +325,8 @@ public class TestDataTransferProtocol extends TestCase {
|
|
|
|
|
|
//At the end of all this, read the file to make sure that succeeds finally.
|
|
|
sendBuf.reset();
|
|
|
- sendOut.writeShort((short)FSConstants.DATA_TRANSFER_VERSION);
|
|
|
- sendOut.writeByte((byte)FSConstants.OP_READ_BLOCK);
|
|
|
+ sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
|
|
|
+ sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
|
|
|
sendOut.writeLong(firstBlock.getBlockId());
|
|
|
sendOut.writeLong(firstBlock.getGenerationStamp());
|
|
|
sendOut.writeLong(0);
|