|
@@ -891,9 +891,14 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
*/
|
|
|
class DataXceiver implements Runnable {
|
|
|
Socket s;
|
|
|
+ String remoteAddress; // address of remote side
|
|
|
+ String localAddress; // local address of this daemon
|
|
|
public DataXceiver(Socket s) {
|
|
|
this.s = s;
|
|
|
childSockets.put(s, s);
|
|
|
+ InetSocketAddress isock = (InetSocketAddress)s.getRemoteSocketAddress();
|
|
|
+ remoteAddress = isock.toString();
|
|
|
+ localAddress = s.getInetAddress() + ":" + s.getLocalPort();
|
|
|
LOG.debug("Number of active connections is: "+xceiverCount);
|
|
|
}
|
|
|
|
|
@@ -1032,7 +1037,9 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// Read in the header
|
|
|
//
|
|
|
Block block = new Block(in.readLong(), estimateBlockSize);
|
|
|
- LOG.info("Receiving block " + block + " from " + s.getInetAddress());
|
|
|
+ LOG.info("Receiving block " + block +
|
|
|
+ " src: " + remoteAddress +
|
|
|
+ " dest: " + localAddress);
|
|
|
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
|
|
|
boolean isRecovery = in.readBoolean(); // is this part of recovery?
|
|
|
String client = Text.readString(in); // working on behalf of this client
|
|
@@ -1047,7 +1054,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
targets[i] = tmp;
|
|
|
}
|
|
|
|
|
|
- short opStatus = OP_STATUS_SUCCESS; // write operation status
|
|
|
DataOutputStream mirrorOut = null; // stream to next target
|
|
|
DataInputStream mirrorIn = null; // reply from next target
|
|
|
DataOutputStream replyOut = null; // stream to prev target
|
|
@@ -1096,16 +1102,20 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
blockReceiver.writeChecksumHeader(mirrorOut);
|
|
|
mirrorOut.flush();
|
|
|
|
|
|
- // read connect ack
|
|
|
- firstBadLink = Text.readString(mirrorIn);
|
|
|
- LOG.info("Datanode " + targets.length +
|
|
|
- " got response for connect ack " +
|
|
|
- " from downstream datanode with firstbadlink as " +
|
|
|
- firstBadLink);
|
|
|
+ // read connect ack (only for clients, not for replication req)
|
|
|
+ if (client.length() != 0) {
|
|
|
+ firstBadLink = Text.readString(mirrorIn);
|
|
|
+ LOG.info("Datanode " + targets.length +
|
|
|
+ " got response for connect ack " +
|
|
|
+ " from downstream datanode with firstbadlink as " +
|
|
|
+ firstBadLink);
|
|
|
+ }
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
- Text.writeString(replyOut, mirrorNode);
|
|
|
- replyOut.flush();
|
|
|
+ if (client.length() != 0) {
|
|
|
+ Text.writeString(replyOut, mirrorNode);
|
|
|
+ replyOut.flush();
|
|
|
+ }
|
|
|
IOUtils.closeStream(mirrorOut);
|
|
|
mirrorOut = null;
|
|
|
IOUtils.closeStream(mirrorIn);
|
|
@@ -1116,12 +1126,14 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // send connect ack back to source
|
|
|
- LOG.info("Datanode " + targets.length +
|
|
|
- " forwarding connect ack to upstream firstbadlink is " +
|
|
|
- firstBadLink);
|
|
|
- Text.writeString(replyOut, firstBadLink);
|
|
|
- replyOut.flush();
|
|
|
+ // send connect ack back to source (only for clients)
|
|
|
+ if (client.length() != 0) {
|
|
|
+ LOG.info("Datanode " + targets.length +
|
|
|
+ " forwarding connect ack to upstream firstbadlink is " +
|
|
|
+ firstBadLink);
|
|
|
+ Text.writeString(replyOut, firstBadLink);
|
|
|
+ replyOut.flush();
|
|
|
+ }
|
|
|
|
|
|
// receive the block and mirror to the next target
|
|
|
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
|
|
@@ -1133,10 +1145,10 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
// the block is finalized in the PacketResponder.
|
|
|
if (client.length() == 0) {
|
|
|
notifyNamenodeReceivedBlock(block, EMPTY_DEL_HINT);
|
|
|
- LOG.info("Received block " + block +
|
|
|
- " of size " + block.getNumBytes() +
|
|
|
- " from " + s.getInetAddress());
|
|
|
-
|
|
|
+ LOG.info("Received block " + block +
|
|
|
+ " src: " + remoteAddress +
|
|
|
+ " dest: " + localAddress +
|
|
|
+ " of size " + block.getNumBytes());
|
|
|
}
|
|
|
|
|
|
if (blockScanner != null) {
|
|
@@ -1145,7 +1157,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.info("writeBlock " + block + " received exception " + ioe);
|
|
|
- opStatus = OP_STATUS_ERROR;
|
|
|
throw ioe;
|
|
|
} finally {
|
|
|
// close all opened streams
|
|
@@ -2185,13 +2196,6 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
this.mirrorAddr = mirrorAddr;
|
|
|
this.throttler = throttler;
|
|
|
|
|
|
- /*
|
|
|
- * We need an estimate for block size to check if the disk partition has
|
|
|
- * enough space. For now we just increment FSDataset.reserved by
|
|
|
- * configured dfs.block.size Other alternative is to include the block
|
|
|
- * size in the header sent by DFSClient.
|
|
|
- */
|
|
|
-
|
|
|
try {
|
|
|
// write data chunk header
|
|
|
if (!finalized) {
|