|
@@ -35,11 +35,11 @@ import org.apache.commons.logging.Log;
|
|
|
import org.apache.hadoop.fs.FSInputChecker;
|
|
|
import org.apache.hadoop.fs.FSOutputSummer;
|
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
|
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
|
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
@@ -805,8 +805,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
// send a heartbeat if it is time.
|
|
|
now = System.currentTimeMillis();
|
|
|
if (now - lastHeartbeat > datanode.socketTimeout/2) {
|
|
|
- replyOut.writeLong(-1); // send heartbeat
|
|
|
+ PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
|
|
|
replyOut.flush();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("PacketResponder " + numTargets +
|
|
|
+ " for block " + block +
|
|
|
+ " sent a heartbeat");
|
|
|
+ }
|
|
|
lastHeartbeat = now;
|
|
|
}
|
|
|
}
|
|
@@ -843,7 +848,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
lastPacket = true;
|
|
|
}
|
|
|
|
|
|
- ackReply(expected);
|
|
|
+ new PipelineAck(expected, new Status[]{SUCCESS}).write(replyOut);
|
|
|
replyOut.flush();
|
|
|
// remove the packet from the ack queue
|
|
|
removeAckHead();
|
|
@@ -870,14 +875,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
" for block " + block + " terminating");
|
|
|
}
|
|
|
|
|
|
- // This method is introduced to facilitate testing. Otherwise
|
|
|
- // there was a little chance to bind an AspectJ advice to such a sequence
|
|
|
- // of calls
|
|
|
- private void ackReply(long expected) throws IOException {
|
|
|
- replyOut.writeLong(expected);
|
|
|
- SUCCESS.write(replyOut);
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Thread to process incoming acks.
|
|
|
* @see java.lang.Runnable#run()
|
|
@@ -896,24 +893,23 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
|
|
|
boolean isInterrupted = false;
|
|
|
try {
|
|
|
- DataTransferProtocol.Status op = SUCCESS;
|
|
|
boolean didRead = false;
|
|
|
Packet pkt = null;
|
|
|
long expected = -2;
|
|
|
+ PipelineAck ack = new PipelineAck();
|
|
|
try {
|
|
|
- // read seqno from downstream datanode
|
|
|
- long seqno = mirrorIn.readLong();
|
|
|
+ // read an ack from downstream datanode
|
|
|
+ ack.readFields(mirrorIn);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
|
|
|
+ }
|
|
|
+ long seqno = ack.getSeqno();
|
|
|
didRead = true;
|
|
|
- if (seqno == -1) {
|
|
|
- replyOut.writeLong(-1); // send keepalive
|
|
|
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
|
|
|
+ ack.write(replyOut);
|
|
|
replyOut.flush();
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got -1");
|
|
|
continue;
|
|
|
- } else if (seqno == -2) {
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got -2");
|
|
|
- } else {
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
|
|
|
- seqno);
|
|
|
+ } else if (seqno >= 0) {
|
|
|
synchronized (this) {
|
|
|
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -931,7 +927,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
pkt = ackQueue.getFirst();
|
|
|
expected = pkt.seqno;
|
|
|
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
|
|
|
if (seqno != expected) {
|
|
|
throw new IOException("PacketResponder " + numTargets +
|
|
|
" for block " + block +
|
|
@@ -964,10 +959,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- if (!didRead) {
|
|
|
- op = ERROR;
|
|
|
- }
|
|
|
-
|
|
|
// If this is the last packet in block, then close block
|
|
|
// file and finalize the block before responding success
|
|
|
if (lastPacketInBlock) {
|
|
@@ -990,54 +981,42 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // send my status back to upstream datanode
|
|
|
- ackReply(expected);
|
|
|
-
|
|
|
- LOG.debug("PacketResponder " + numTargets +
|
|
|
- " for block " + block +
|
|
|
- " responded my status " +
|
|
|
- " for seqno " + expected);
|
|
|
-
|
|
|
- boolean success = true;
|
|
|
- // forward responses from downstream datanodes.
|
|
|
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
|
|
|
- try {
|
|
|
- if (op == SUCCESS) {
|
|
|
- op = Status.read(mirrorIn);
|
|
|
- if (op != SUCCESS) {
|
|
|
- success = false;
|
|
|
- LOG.debug("PacketResponder for block " + block +
|
|
|
- ": error code received from downstream " +
|
|
|
- " datanode[" + i + "] " + op);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- op = ERROR;
|
|
|
- success = false;
|
|
|
+ // construct my ack message
|
|
|
+ Status[] replies = null;
|
|
|
+ if (!didRead) { // no ack is read
|
|
|
+ replies = new Status[2];
|
|
|
+ replies[0] = SUCCESS;
|
|
|
+ replies[1] = ERROR;
|
|
|
+ } else {
|
|
|
+ replies = new Status[1+ack.getNumOfReplies()];
|
|
|
+ replies[0] = SUCCESS;
|
|
|
+ for (int i=0; i<ack.getNumOfReplies(); i++) {
|
|
|
+ replies[i+1] = ack.getReply(i);
|
|
|
}
|
|
|
- op.write(replyOut);
|
|
|
}
|
|
|
- replyOut.flush();
|
|
|
+ PipelineAck replyAck = new PipelineAck(expected, replies);
|
|
|
|
|
|
- LOG.debug("PacketResponder " + block + " " + numTargets +
|
|
|
- " responded other status " + " for seqno " + expected);
|
|
|
-
|
|
|
+ // send my ack back to upstream datanode
|
|
|
+ replyAck.write(replyOut);
|
|
|
+ replyOut.flush();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("PacketResponder " + numTargets +
|
|
|
+ " for block " + block +
|
|
|
+ " responded an ack: " + replyAck);
|
|
|
+ }
|
|
|
if (pkt != null) {
|
|
|
// remove the packet from the ack queue
|
|
|
removeAckHead();
|
|
|
// update bytes acked
|
|
|
- if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
|
|
|
+ if (replyAck.isSuccess() &&
|
|
|
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
|
|
|
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
|
|
|
}
|
|
|
}
|
|
|
- // If we were unable to read the seqno from downstream, then stop.
|
|
|
- if (expected == -2) {
|
|
|
- running = false;
|
|
|
- }
|
|
|
// If we forwarded an error response from a downstream datanode
|
|
|
// and we are acting on behalf of a client, then we quit. The
|
|
|
// client will drive the recovery mechanism.
|
|
|
- if (op == ERROR && receiver.clientName.length() > 0) {
|
|
|
+ if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
|
|
|
running = false;
|
|
|
}
|
|
|
} catch (IOException e) {
|