|
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
|
|
|
import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.OutputStream;
|
|
|
+import java.net.InetAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.zip.CRC32;
|
|
@@ -39,6 +40,7 @@ import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
+import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
|
|
|
|
|
/** A class that receives a block and writes to its own disk, meanwhile
|
|
|
* may copies it to another site. If a throttler is provided,
|
|
@@ -46,6 +48,7 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
**/
|
|
|
class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
public static final Log LOG = DataNode.LOG;
|
|
|
+ static final Log ClientTraceLog = DataNode.ClientTraceLog;
|
|
|
|
|
|
private Block block; // the block to receive
|
|
|
protected boolean finalized;
|
|
@@ -60,6 +63,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
private int maxPacketReadLen;
|
|
|
protected long offsetInBlock;
|
|
|
protected final String inAddr;
|
|
|
+ protected final String myAddr;
|
|
|
private String mirrorAddr;
|
|
|
private DataOutputStream mirrorOut;
|
|
|
private Daemon responder = null;
|
|
@@ -72,12 +76,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
private DataNode datanode = null;
|
|
|
|
|
|
BlockReceiver(Block block, DataInputStream in, String inAddr,
|
|
|
- boolean isRecovery, String clientName,
|
|
|
+ String myAddr, boolean isRecovery, String clientName,
|
|
|
DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
|
|
|
try{
|
|
|
this.block = block;
|
|
|
this.in = in;
|
|
|
this.inAddr = inAddr;
|
|
|
+ this.myAddr = myAddr;
|
|
|
this.isRecovery = isRecovery;
|
|
|
this.clientName = clientName;
|
|
|
this.offsetInBlock = 0;
|
|
@@ -498,8 +503,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
if (clientName.length() > 0) {
|
|
|
responder = new Daemon(datanode.threadGroup,
|
|
|
new PacketResponder(this, block, mirrIn,
|
|
|
- replyOut, numTargets,
|
|
|
- clientName));
|
|
|
+ replyOut, numTargets));
|
|
|
responder.start(); // start thread to processes reponses
|
|
|
}
|
|
|
|
|
@@ -673,7 +677,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
DataInputStream mirrorIn; // input from downstream datanode
|
|
|
DataOutputStream replyOut; // output to upstream datanode
|
|
|
private int numTargets; // number of downstream datanodes including myself
|
|
|
- private String clientName; // The name of the client (if any)
|
|
|
private BlockReceiver receiver; // The owner of this responder.
|
|
|
|
|
|
public String toString() {
|
|
@@ -681,13 +684,12 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
|
|
|
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
|
|
|
- DataOutputStream out, int numTargets, String clientName) {
|
|
|
+ DataOutputStream out, int numTargets) {
|
|
|
this.receiver = receiver;
|
|
|
this.block = b;
|
|
|
mirrorIn = in;
|
|
|
replyOut = out;
|
|
|
this.numTargets = numTargets;
|
|
|
- this.clientName = clientName;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -776,9 +778,17 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
datanode.myMetrics.blocksWritten.inc();
|
|
|
datanode.notifyNamenodeReceivedBlock(block,
|
|
|
DataNode.EMPTY_DEL_HINT);
|
|
|
- LOG.info("Received block " + block +
|
|
|
- " of size " + block.getNumBytes() +
|
|
|
- " from " + receiver.inAddr);
|
|
|
+ if (ClientTraceLog.isInfoEnabled() &&
|
|
|
+ receiver.clientName.length() > 0) {
|
|
|
+ ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
|
|
|
+ receiver.inAddr, receiver.myAddr, block.getNumBytes(),
|
|
|
+ "HDFS_WRITE", receiver.clientName,
|
|
|
+ datanode.dnRegistration.getStorageID(), block));
|
|
|
+ } else {
|
|
|
+ LOG.info("Received block " + block +
|
|
|
+ " of size " + block.getNumBytes() +
|
|
|
+ " from " + receiver.inAddr);
|
|
|
+ }
|
|
|
}
|
|
|
lastPacket = true;
|
|
|
}
|
|
@@ -891,9 +901,17 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
datanode.myMetrics.blocksWritten.inc();
|
|
|
datanode.notifyNamenodeReceivedBlock(block,
|
|
|
DataNode.EMPTY_DEL_HINT);
|
|
|
- LOG.info("Received block " + block +
|
|
|
- " of size " + block.getNumBytes() +
|
|
|
- " from " + receiver.inAddr);
|
|
|
+ if (ClientTraceLog.isInfoEnabled() &&
|
|
|
+ receiver.clientName.length() > 0) {
|
|
|
+ ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
|
|
|
+ receiver.inAddr, receiver.myAddr, block.getNumBytes(),
|
|
|
+ "HDFS_WRITE", receiver.clientName,
|
|
|
+ datanode.dnRegistration.getStorageID(), block));
|
|
|
+ } else {
|
|
|
+ LOG.info("Received block " + block +
|
|
|
+ " of size " + block.getNumBytes() +
|
|
|
+ " from " + receiver.inAddr);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// send my status back to upstream datanode
|
|
@@ -932,7 +950,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
// If we forwarded an error response from a downstream datanode
|
|
|
// and we are acting on behalf of a client, then we quit. The
|
|
|
// client will drive the recovery mechanism.
|
|
|
- if (op == OP_STATUS_ERROR && clientName.length() > 0) {
|
|
|
+ if (op == OP_STATUS_ERROR && receiver.clientName.length() > 0) {
|
|
|
running = false;
|
|
|
}
|
|
|
} catch (IOException e) {
|