|
@@ -29,8 +29,10 @@ import java.net.SocketException;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
|
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
|
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
|
|
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
import org.apache.hadoop.io.MD5Hash;
|
|
@@ -74,7 +76,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
new BufferedInputStream(NetUtils.getInputStream(s),
|
|
new BufferedInputStream(NetUtils.getInputStream(s),
|
|
SMALL_BUFFER_SIZE));
|
|
SMALL_BUFFER_SIZE));
|
|
short version = in.readShort();
|
|
short version = in.readShort();
|
|
- if ( version != DATA_TRANSFER_VERSION ) {
|
|
|
|
|
|
+ if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
|
|
throw new IOException( "Version Mismatch" );
|
|
throw new IOException( "Version Mismatch" );
|
|
}
|
|
}
|
|
boolean local = s.getInetAddress().equals(s.getLocalAddress());
|
|
boolean local = s.getInetAddress().equals(s.getLocalAddress());
|
|
@@ -88,7 +90,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
}
|
|
}
|
|
long startTime = DataNode.now();
|
|
long startTime = DataNode.now();
|
|
switch ( op ) {
|
|
switch ( op ) {
|
|
- case OP_READ_BLOCK:
|
|
|
|
|
|
+ case DataTransferProtocol.OP_READ_BLOCK:
|
|
readBlock( in );
|
|
readBlock( in );
|
|
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
|
|
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
|
|
if (local)
|
|
if (local)
|
|
@@ -96,7 +98,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
else
|
|
else
|
|
datanode.myMetrics.readsFromRemoteClient.inc();
|
|
datanode.myMetrics.readsFromRemoteClient.inc();
|
|
break;
|
|
break;
|
|
- case OP_WRITE_BLOCK:
|
|
|
|
|
|
+ case DataTransferProtocol.OP_WRITE_BLOCK:
|
|
writeBlock( in );
|
|
writeBlock( in );
|
|
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
|
|
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
|
|
if (local)
|
|
if (local)
|
|
@@ -104,19 +106,20 @@ class DataXceiver implements Runnable, FSConstants {
|
|
else
|
|
else
|
|
datanode.myMetrics.writesFromRemoteClient.inc();
|
|
datanode.myMetrics.writesFromRemoteClient.inc();
|
|
break;
|
|
break;
|
|
- case OP_READ_METADATA:
|
|
|
|
|
|
+ case DataTransferProtocol.OP_READ_METADATA:
|
|
readMetadata( in );
|
|
readMetadata( in );
|
|
datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
|
|
datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
|
|
break;
|
|
break;
|
|
- case OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
|
|
|
|
|
|
+ case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
|
|
replaceBlock(in);
|
|
replaceBlock(in);
|
|
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
|
|
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
|
|
break;
|
|
break;
|
|
- case OP_COPY_BLOCK: // for balancing purpose; send to a proxy source
|
|
|
|
|
|
+ case DataTransferProtocol.OP_COPY_BLOCK:
|
|
|
|
+ // for balancing purpose; send to a proxy source
|
|
copyBlock(in);
|
|
copyBlock(in);
|
|
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
|
|
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
|
|
break;
|
|
break;
|
|
- case OP_BLOCK_CHECKSUM: //get the checksum of a block
|
|
|
|
|
|
+ case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
|
|
getBlockChecksum(in);
|
|
getBlockChecksum(in);
|
|
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
|
|
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
|
|
break;
|
|
break;
|
|
@@ -168,18 +171,18 @@ class DataXceiver implements Runnable, FSConstants {
|
|
blockSender = new BlockSender(block, startOffset, length,
|
|
blockSender = new BlockSender(block, startOffset, length,
|
|
true, true, false, datanode, clientTraceFmt);
|
|
true, true, false, datanode, clientTraceFmt);
|
|
} catch(IOException e) {
|
|
} catch(IOException e) {
|
|
- out.writeShort(OP_STATUS_ERROR);
|
|
|
|
|
|
+ out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
|
|
throw e;
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
|
|
- out.writeShort(DataNode.OP_STATUS_SUCCESS); // send op status
|
|
|
|
|
|
+ out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
|
|
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
|
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
|
|
|
|
|
if (blockSender.isBlockReadFully()) {
|
|
if (blockSender.isBlockReadFully()) {
|
|
// See if client verification succeeded.
|
|
// See if client verification succeeded.
|
|
// This is an optional response from client.
|
|
// This is an optional response from client.
|
|
try {
|
|
try {
|
|
- if (in.readShort() == OP_STATUS_CHECKSUM_OK &&
|
|
|
|
|
|
+ if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK &&
|
|
datanode.blockScanner != null) {
|
|
datanode.blockScanner != null) {
|
|
datanode.blockScanner.verifiedByClient(block);
|
|
datanode.blockScanner.verifiedByClient(block);
|
|
}
|
|
}
|
|
@@ -274,7 +277,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
try {
|
|
try {
|
|
int timeoutValue = numTargets * datanode.socketTimeout;
|
|
int timeoutValue = numTargets * datanode.socketTimeout;
|
|
int writeTimeout = datanode.socketWriteTimeout +
|
|
int writeTimeout = datanode.socketWriteTimeout +
|
|
- (WRITE_TIMEOUT_EXTENSION * numTargets);
|
|
|
|
|
|
+ (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
|
|
mirrorSock.connect(mirrorTarget, timeoutValue);
|
|
mirrorSock.connect(mirrorTarget, timeoutValue);
|
|
mirrorSock.setSoTimeout(timeoutValue);
|
|
mirrorSock.setSoTimeout(timeoutValue);
|
|
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
|
|
@@ -285,8 +288,8 @@ class DataXceiver implements Runnable, FSConstants {
|
|
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
|
|
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
|
|
|
|
|
|
// Write header: Copied from DFSClient.java!
|
|
// Write header: Copied from DFSClient.java!
|
|
- mirrorOut.writeShort( DATA_TRANSFER_VERSION );
|
|
|
|
- mirrorOut.write( OP_WRITE_BLOCK );
|
|
|
|
|
|
+ mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
|
|
|
|
+ mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
|
|
mirrorOut.writeLong( block.getBlockId() );
|
|
mirrorOut.writeLong( block.getBlockId() );
|
|
mirrorOut.writeLong( block.getGenerationStamp() );
|
|
mirrorOut.writeLong( block.getGenerationStamp() );
|
|
mirrorOut.writeInt( pipelineSize );
|
|
mirrorOut.writeInt( pipelineSize );
|
|
@@ -407,7 +410,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
out = new DataOutputStream(
|
|
out = new DataOutputStream(
|
|
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
|
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
|
|
|
|
|
- out.writeByte(OP_STATUS_SUCCESS);
|
|
|
|
|
|
+ out.writeByte(DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
out.writeInt(buf.length);
|
|
out.writeInt(buf.length);
|
|
out.write(buf);
|
|
out.write(buf);
|
|
|
|
|
|
@@ -450,7 +453,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
//write reply
|
|
//write reply
|
|
out = new DataOutputStream(
|
|
out = new DataOutputStream(
|
|
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
|
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
|
|
- out.writeShort(OP_STATUS_SUCCESS);
|
|
|
|
|
|
+ out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
out.writeInt(bytesPerCRC);
|
|
out.writeInt(bytesPerCRC);
|
|
out.writeLong(crcPerBlock);
|
|
out.writeLong(crcPerBlock);
|
|
md5.write(out);
|
|
md5.write(out);
|
|
@@ -477,7 +480,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
target.readFields(in);
|
|
target.readFields(in);
|
|
|
|
|
|
Socket targetSock = null;
|
|
Socket targetSock = null;
|
|
- short opStatus = OP_STATUS_SUCCESS;
|
|
|
|
|
|
+ short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
BlockSender blockSender = null;
|
|
BlockSender blockSender = null;
|
|
DataOutputStream targetOut = null;
|
|
DataOutputStream targetOut = null;
|
|
try {
|
|
try {
|
|
@@ -501,8 +504,8 @@ class DataXceiver implements Runnable, FSConstants {
|
|
|
|
|
|
/* send request to the target */
|
|
/* send request to the target */
|
|
// fist write header info
|
|
// fist write header info
|
|
- targetOut.writeShort(DATA_TRANSFER_VERSION); // transfer version
|
|
|
|
- targetOut.writeByte(OP_REPLACE_BLOCK); // op code
|
|
|
|
|
|
+ targetOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version
|
|
|
|
+ targetOut.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK); // op code
|
|
targetOut.writeLong(block.getBlockId()); // block id
|
|
targetOut.writeLong(block.getBlockId()); // block id
|
|
targetOut.writeLong(block.getGenerationStamp()); // block id
|
|
targetOut.writeLong(block.getGenerationStamp()); // block id
|
|
Text.writeString( targetOut, source); // del hint
|
|
Text.writeString( targetOut, source); // del hint
|
|
@@ -519,7 +522,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
|
|
|
|
LOG.info("Copied block " + block + " to " + targetAddr);
|
|
LOG.info("Copied block " + block + " to " + targetAddr);
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
- opStatus = OP_STATUS_ERROR;
|
|
|
|
|
|
+ opStatus = DataTransferProtocol.OP_STATUS_ERROR;
|
|
LOG.warn("Got exception while serving " + block + " to "
|
|
LOG.warn("Got exception while serving " + block + " to "
|
|
+ target.getName() + ": " + StringUtils.stringifyException(ioe));
|
|
+ target.getName() + ": " + StringUtils.stringifyException(ioe));
|
|
throw ioe;
|
|
throw ioe;
|
|
@@ -553,7 +556,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
in.readLong()); // block id & len
|
|
in.readLong()); // block id & len
|
|
String sourceID = Text.readString(in);
|
|
String sourceID = Text.readString(in);
|
|
|
|
|
|
- short opStatus = OP_STATUS_SUCCESS;
|
|
|
|
|
|
+ short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
BlockReceiver blockReceiver = null;
|
|
BlockReceiver blockReceiver = null;
|
|
try {
|
|
try {
|
|
// open a block receiver and check if the block does not exist
|
|
// open a block receiver and check if the block does not exist
|
|
@@ -571,7 +574,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
LOG.info("Moved block " + block +
|
|
LOG.info("Moved block " + block +
|
|
" from " + s.getRemoteSocketAddress());
|
|
" from " + s.getRemoteSocketAddress());
|
|
} catch (IOException ioe) {
|
|
} catch (IOException ioe) {
|
|
- opStatus = OP_STATUS_ERROR;
|
|
|
|
|
|
+ opStatus = DataTransferProtocol.OP_STATUS_ERROR;
|
|
throw ioe;
|
|
throw ioe;
|
|
} finally {
|
|
} finally {
|
|
// send response back
|
|
// send response back
|
|
@@ -597,7 +600,7 @@ class DataXceiver implements Runnable, FSConstants {
|
|
try {
|
|
try {
|
|
for (int i = 0; i < numTargets; i++) {
|
|
for (int i = 0; i < numTargets; i++) {
|
|
short opStatus = reply.readShort();
|
|
short opStatus = reply.readShort();
|
|
- if(opStatus != OP_STATUS_SUCCESS) {
|
|
|
|
|
|
+ if(opStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
throw new IOException("operation failed at "+
|
|
throw new IOException("operation failed at "+
|
|
s.getInetAddress());
|
|
s.getInetAddress());
|
|
}
|
|
}
|