|
@@ -17,6 +17,12 @@
|
|
|
*/
|
|
|
package org.apache.hadoop.hdfs.server.datanode;
|
|
|
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
|
|
|
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
|
|
|
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
|
|
+
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
@@ -42,7 +48,6 @@ import org.apache.hadoop.security.AccessToken;
|
|
|
import org.apache.hadoop.security.AccessTokenHandler;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
-import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
|
|
|
|
|
/**
|
|
|
* Thread for processing incoming/outgoing data stream.
|
|
@@ -79,7 +84,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
in = new DataInputStream(
|
|
|
new BufferedInputStream(NetUtils.getInputStream(s),
|
|
|
SMALL_BUFFER_SIZE));
|
|
|
- final byte op = op(in);
|
|
|
+ final DataTransferProtocol.Op op = op(in);
|
|
|
boolean local = s.getInetAddress().equals(s.getLocalAddress());
|
|
|
// Make sure the xciver count is not exceeded
|
|
|
int curXceiverCount = datanode.getXceiverCount();
|
|
@@ -90,7 +95,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
}
|
|
|
long startTime = DataNode.now();
|
|
|
switch ( op ) {
|
|
|
- case DataTransferProtocol.OP_READ_BLOCK:
|
|
|
+ case READ_BLOCK:
|
|
|
opReadBlock(in);
|
|
|
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
|
|
|
if (local)
|
|
@@ -98,7 +103,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
else
|
|
|
datanode.myMetrics.readsFromRemoteClient.inc();
|
|
|
break;
|
|
|
- case DataTransferProtocol.OP_WRITE_BLOCK:
|
|
|
+ case WRITE_BLOCK:
|
|
|
opWriteBlock(in);
|
|
|
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
|
|
|
if (local)
|
|
@@ -106,16 +111,16 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
else
|
|
|
datanode.myMetrics.writesFromRemoteClient.inc();
|
|
|
break;
|
|
|
- case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
|
|
|
+ case REPLACE_BLOCK: // for balancing purpose; send to a destination
|
|
|
opReplaceBlock(in);
|
|
|
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
|
|
|
break;
|
|
|
- case DataTransferProtocol.OP_COPY_BLOCK:
|
|
|
+ case COPY_BLOCK:
|
|
|
// for balancing purpose; send to a proxy source
|
|
|
opCopyBlock(in);
|
|
|
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
|
|
|
break;
|
|
|
- case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
|
|
|
+ case BLOCK_CHECKSUM: //get the checksum of a block
|
|
|
opBlockChecksum(in);
|
|
|
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
|
|
|
break;
|
|
@@ -150,7 +155,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, blockId,
|
|
|
AccessTokenHandler.AccessMode.READ)) {
|
|
|
try {
|
|
|
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
|
|
|
+ ERROR_ACCESS_TOKEN.write(out);
|
|
|
out.flush();
|
|
|
throw new IOException("Access token verification failed, on client "
|
|
|
+ "request for reading block " + block);
|
|
@@ -172,19 +177,19 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
blockSender = new BlockSender(block, startOffset, length,
|
|
|
true, true, false, datanode, clientTraceFmt);
|
|
|
} catch(IOException e) {
|
|
|
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
+ ERROR.write(out);
|
|
|
throw e;
|
|
|
}
|
|
|
|
|
|
- out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
|
|
|
+ SUCCESS.write(out); // send op status
|
|
|
long read = blockSender.sendBlock(out, baseStream, null); // send data
|
|
|
|
|
|
if (blockSender.isBlockReadFully()) {
|
|
|
// See if client verification succeeded.
|
|
|
// This is an optional response from client.
|
|
|
try {
|
|
|
- if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK &&
|
|
|
- datanode.blockScanner != null) {
|
|
|
+ if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
|
|
|
+ && datanode.blockScanner != null) {
|
|
|
datanode.blockScanner.verifiedByClient(block);
|
|
|
}
|
|
|
} catch (IOException ignored) {}
|
|
@@ -238,7 +243,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
.getBlockId(), AccessTokenHandler.AccessMode.WRITE)) {
|
|
|
try {
|
|
|
if (client.length() != 0) {
|
|
|
- replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
|
|
|
+ ERROR_ACCESS_TOKEN.write(replyOut);
|
|
|
Text.writeString(replyOut, datanode.dnRegistration.getName());
|
|
|
replyOut.flush();
|
|
|
}
|
|
@@ -255,7 +260,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
BlockReceiver blockReceiver = null; // responsible for data handling
|
|
|
String mirrorNode = null; // the name:port of next target
|
|
|
String firstBadLink = ""; // first datanode that failed in connection setup
|
|
|
- short mirrorInStatus = (short)DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
+ DataTransferProtocol.Status mirrorInStatus = SUCCESS;
|
|
|
try {
|
|
|
// open a block receiver and check if the block does not exist
|
|
|
blockReceiver = new BlockReceiver(block, in,
|
|
@@ -296,9 +301,9 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
|
|
|
// read connect ack (only for clients, not for replication req)
|
|
|
if (client.length() != 0) {
|
|
|
- mirrorInStatus = mirrorIn.readShort();
|
|
|
+ mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
|
|
|
firstBadLink = Text.readString(mirrorIn);
|
|
|
- if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
+ if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
|
|
|
LOG.info("Datanode " + targets.length +
|
|
|
" got response for connect ack " +
|
|
|
" from downstream datanode with firstbadlink as " +
|
|
@@ -308,7 +313,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
if (client.length() != 0) {
|
|
|
- replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
|
|
|
+ ERROR.write(replyOut);
|
|
|
Text.writeString(replyOut, mirrorNode);
|
|
|
replyOut.flush();
|
|
|
}
|
|
@@ -331,12 +336,12 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
|
|
|
// send connect ack back to source (only for clients)
|
|
|
if (client.length() != 0) {
|
|
|
- if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
+ if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
|
|
|
LOG.info("Datanode " + targets.length +
|
|
|
" forwarding connect ack to upstream firstbadlink is " +
|
|
|
firstBadLink);
|
|
|
}
|
|
|
- replyOut.writeShort(mirrorInStatus);
|
|
|
+ mirrorInStatus.write(replyOut);
|
|
|
Text.writeString(replyOut, firstBadLink);
|
|
|
replyOut.flush();
|
|
|
}
|
|
@@ -387,7 +392,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
&& !datanode.accessTokenHandler.checkAccess(accessToken, null, block
|
|
|
.getBlockId(), AccessTokenHandler.AccessMode.READ)) {
|
|
|
try {
|
|
|
- out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
|
|
|
+ ERROR_ACCESS_TOKEN.write(out);
|
|
|
out.flush();
|
|
|
throw new IOException(
|
|
|
"Access token verification failed, on getBlockChecksum() "
|
|
@@ -418,7 +423,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
}
|
|
|
|
|
|
//write reply
|
|
|
- out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
|
+ SUCCESS.write(out);
|
|
|
out.writeInt(bytesPerCRC);
|
|
|
out.writeLong(crcPerBlock);
|
|
|
md5.write(out);
|
|
@@ -443,17 +448,14 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
AccessTokenHandler.AccessMode.COPY)) {
|
|
|
LOG.warn("Invalid access token in request from "
|
|
|
+ s.getRemoteSocketAddress() + " for copying block " + block);
|
|
|
- sendResponse(s,
|
|
|
- (short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
|
|
|
- datanode.socketWriteTimeout);
|
|
|
+ sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
|
LOG.info("Not able to copy block " + blockId + " to "
|
|
|
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
|
|
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
|
|
|
- datanode.socketWriteTimeout);
|
|
|
+ sendResponse(s, ERROR, datanode.socketWriteTimeout);
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -473,7 +475,7 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
baseStream, SMALL_BUFFER_SIZE));
|
|
|
|
|
|
// send status first
|
|
|
- reply.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
|
+ SUCCESS.write(reply);
|
|
|
// send block content to the target
|
|
|
long read = blockSender.sendBlock(reply, baseStream,
|
|
|
dataXceiverServer.balanceThrottler);
|
|
@@ -515,22 +517,20 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
AccessTokenHandler.AccessMode.REPLACE)) {
|
|
|
LOG.warn("Invalid access token in request from "
|
|
|
+ s.getRemoteSocketAddress() + " for replacing block " + block);
|
|
|
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
|
|
|
- datanode.socketWriteTimeout);
|
|
|
+ sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
|
|
|
LOG.warn("Not able to receive block " + blockId + " from "
|
|
|
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.");
|
|
|
- sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
|
|
|
- datanode.socketWriteTimeout);
|
|
|
+ sendResponse(s, ERROR, datanode.socketWriteTimeout);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
Socket proxySock = null;
|
|
|
DataOutputStream proxyOut = null;
|
|
|
- short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
+ DataTransferProtocol.Status opStatus = SUCCESS;
|
|
|
BlockReceiver blockReceiver = null;
|
|
|
DataInputStream proxyReply = null;
|
|
|
|
|
@@ -554,9 +554,10 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
// receive the response from the proxy
|
|
|
proxyReply = new DataInputStream(new BufferedInputStream(
|
|
|
NetUtils.getInputStream(proxySock), BUFFER_SIZE));
|
|
|
- short status = proxyReply.readShort();
|
|
|
- if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
|
|
|
+ final DataTransferProtocol.Status status
|
|
|
+ = DataTransferProtocol.Status.read(proxyReply);
|
|
|
+ if (status != SUCCESS) {
|
|
|
+ if (status == ERROR_ACCESS_TOKEN) {
|
|
|
throw new IOException("Copy block " + block + " from "
|
|
|
+ proxySock.getRemoteSocketAddress()
|
|
|
+ " failed due to access token error");
|
|
@@ -581,11 +582,11 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
" from " + s.getRemoteSocketAddress());
|
|
|
|
|
|
} catch (IOException ioe) {
|
|
|
- opStatus = DataTransferProtocol.OP_STATUS_ERROR;
|
|
|
+ opStatus = ERROR;
|
|
|
throw ioe;
|
|
|
} finally {
|
|
|
// receive the last byte that indicates the proxy released its thread resource
|
|
|
- if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
+ if (opStatus == SUCCESS) {
|
|
|
try {
|
|
|
proxyReply.readChar();
|
|
|
} catch (IOException ignored) {
|
|
@@ -613,12 +614,12 @@ class DataXceiver extends DataTransferProtocol.Receiver
|
|
|
* @param opStatus status message to write
|
|
|
* @param timeout send timeout
|
|
|
**/
|
|
|
- private void sendResponse(Socket s, short opStatus, long timeout)
|
|
|
- throws IOException {
|
|
|
+ private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
|
|
|
+ long timeout) throws IOException {
|
|
|
DataOutputStream reply =
|
|
|
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
|
|
|
try {
|
|
|
- reply.writeShort(opStatus);
|
|
|
+ opStatus.write(reply);
|
|
|
reply.flush();
|
|
|
} finally {
|
|
|
IOUtils.closeStream(reply);
|