|
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.util.Daemon;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
@@ -773,8 +774,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
// send a heartbeat if it is time.
|
|
|
now = System.currentTimeMillis();
|
|
|
if (now - lastHeartbeat > datanode.socketTimeout/2) {
|
|
|
- replyOut.writeLong(-1); // send heartbeat
|
|
|
+ PipelineAck.HEART_BEAT.write(replyOut); // send heart beat
|
|
|
replyOut.flush();
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("PacketResponder " + numTargets +
|
|
|
+ " for block " + block +
|
|
|
+ " sent a heartbeat");
|
|
|
+ }
|
|
|
lastHeartbeat = now;
|
|
|
}
|
|
|
}
|
|
@@ -814,8 +820,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
lastPacket = true;
|
|
|
}
|
|
|
|
|
|
- replyOut.writeLong(expected);
|
|
|
- replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
|
+ new PipelineAck(expected, new short[]{
|
|
|
+ DataTransferProtocol.OP_STATUS_SUCCESS}).write(replyOut);
|
|
|
replyOut.flush();
|
|
|
} catch (Exception e) {
|
|
|
if (running) {
|
|
@@ -845,23 +851,41 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
while (running && datanode.shouldRun && !lastPacketInBlock) {
|
|
|
|
|
|
try {
|
|
|
- short op = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
boolean didRead = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sequence number -2 is a special value that is used when
|
|
|
+ * a DN fails to read an ack from a downstream. In this case,
|
|
|
+ * it needs to tell the client that there's been an error downstream
|
|
|
+ * but has no valid sequence number to use. Thus, -2 is used
|
|
|
+ * as an UNKNOWN value.
|
|
|
+ */
|
|
|
long expected = -2;
|
|
|
+
|
|
|
+ PipelineAck ack = new PipelineAck();
|
|
|
try {
|
|
|
- // read seqno from downstream datanode
|
|
|
- long seqno = mirrorIn.readLong();
|
|
|
+ // read an ack from downstream datanode
|
|
|
+ ack.readFields(mirrorIn, numTargets);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
|
|
|
+ }
|
|
|
+ long seqno = ack.getSeqno();
|
|
|
didRead = true;
|
|
|
- if (seqno == -1) {
|
|
|
- replyOut.writeLong(-1); // send keepalive
|
|
|
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
|
|
|
+ ack.write(replyOut); // send keepalive
|
|
|
replyOut.flush();
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got -1");
|
|
|
continue;
|
|
|
} else if (seqno == -2) {
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got -2");
|
|
|
+ // A downstream node must have failed to read an ack. We need
|
|
|
+ // to forward this on.
|
|
|
+ assert ! ack.isSuccess();
|
|
|
} else {
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
|
|
|
- seqno);
|
|
|
+ if (seqno < 0) {
|
|
|
+ throw new IOException("Received an invalid negative sequence number. "
|
|
|
+ + "Ack = " + ack);
|
|
|
+ }
|
|
|
+ assert seqno >= 0;
|
|
|
+
|
|
|
Packet pkt = null;
|
|
|
synchronized (this) {
|
|
|
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
@@ -876,7 +900,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
pkt = ackQueue.removeFirst();
|
|
|
expected = pkt.seqno;
|
|
|
notifyAll();
|
|
|
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
|
|
|
if (seqno != expected) {
|
|
|
throw new IOException("PacketResponder " + numTargets +
|
|
|
" for block " + block +
|
|
@@ -909,10 +932,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- if (!didRead) {
|
|
|
- op = DataTransferProtocol.OP_STATUS_ERROR;
|
|
|
- }
|
|
|
-
|
|
|
// If this is the last packet in block, then close block
|
|
|
// file and finalize the block before responding success
|
|
|
if (lastPacketInBlock && !receiver.finalized) {
|
|
@@ -935,43 +954,37 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // send my status back to upstream datanode
|
|
|
- replyOut.writeLong(expected); // send seqno upstream
|
|
|
- replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
|
|
|
-
|
|
|
- LOG.debug("PacketResponder " + numTargets +
|
|
|
- " for block " + block +
|
|
|
- " responded my status " +
|
|
|
- " for seqno " + expected);
|
|
|
-
|
|
|
- // forward responses from downstream datanodes.
|
|
|
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
|
|
|
- try {
|
|
|
- if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- op = mirrorIn.readShort();
|
|
|
- if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- LOG.debug("PacketResponder for block " + block +
|
|
|
- ": error code received from downstream " +
|
|
|
- " datanode[" + i + "] " + op);
|
|
|
- }
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- op = DataTransferProtocol.OP_STATUS_ERROR;
|
|
|
+ // construct my ack message.
|
|
|
+ short[] replies = new short[1 + numTargets];
|
|
|
+ if (!didRead) { // no ack is read
|
|
|
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
+ // Fill all downstream nodes with ERROR - the client will
|
|
|
+ // eject the first node with ERROR status (our mirror)
|
|
|
+ for (int i = 1; i < replies.length; i++) {
|
|
|
+ replies[i] = DataTransferProtocol.OP_STATUS_ERROR;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ replies = new short[1+numTargets];
|
|
|
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
+ for (int i=0; i<numTargets; i++) {
|
|
|
+ replies[i+1] = ack.getReply(i);
|
|
|
}
|
|
|
- replyOut.writeShort(op);
|
|
|
}
|
|
|
+ PipelineAck replyAck = new PipelineAck(expected, replies);
|
|
|
+
|
|
|
+ // send my ack back to upstream datanode
|
|
|
+ replyAck.write(replyOut);
|
|
|
replyOut.flush();
|
|
|
- LOG.debug("PacketResponder " + block + " " + numTargets +
|
|
|
- " responded other status " + " for seqno " + expected);
|
|
|
-
|
|
|
- // If we were unable to read the seqno from downstream, then stop.
|
|
|
- if (expected == -2) {
|
|
|
- running = false;
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("PacketResponder " + numTargets +
|
|
|
+ " for block " + block +
|
|
|
+ " responded an ack: " + replyAck);
|
|
|
}
|
|
|
+
|
|
|
// If we forwarded an error response from a downstream datanode
|
|
|
// and we are acting on behalf of a client, then we quit. The
|
|
|
// client will drive the recovery mechanism.
|
|
|
- if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
|
|
|
+ if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
|
|
|
running = false;
|
|
|
}
|
|
|
} catch (IOException e) {
|