|
@@ -75,6 +75,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
DatanodeInfo srcDataNode = null;
|
|
DatanodeInfo srcDataNode = null;
|
|
private Checksum partialCrc = null;
|
|
private Checksum partialCrc = null;
|
|
private DataNode datanode = null;
|
|
private DataNode datanode = null;
|
|
|
|
+ volatile private boolean mirrorError;
|
|
|
|
|
|
BlockReceiver(Block block, DataInputStream in, String inAddr,
|
|
BlockReceiver(Block block, DataInputStream in, String inAddr,
|
|
String myAddr, boolean isRecovery, String clientName,
|
|
String myAddr, boolean isRecovery, String clientName,
|
|
@@ -173,21 +174,19 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
|
|
|
/**
|
|
/**
|
|
* While writing to mirrorOut, failure to write to mirror should not
|
|
* While writing to mirrorOut, failure to write to mirror should not
|
|
- * affect this datanode unless a client is writing the block.
|
|
|
|
|
|
+ * affect this datanode.
|
|
*/
|
|
*/
|
|
private void handleMirrorOutError(IOException ioe) throws IOException {
|
|
private void handleMirrorOutError(IOException ioe) throws IOException {
|
|
- LOG.info(datanode.dnRegistration + ":Exception writing block " +
|
|
|
|
|
|
+ LOG.info(datanode.dnRegistration + ": Exception writing block " +
|
|
block + " to mirror " + mirrorAddr + "\n" +
|
|
block + " to mirror " + mirrorAddr + "\n" +
|
|
StringUtils.stringifyException(ioe));
|
|
StringUtils.stringifyException(ioe));
|
|
- mirrorOut = null;
|
|
|
|
- //
|
|
|
|
- // If stream-copy fails, continue
|
|
|
|
- // writing to disk for replication requests. For client
|
|
|
|
- // writes, return error so that the client can do error
|
|
|
|
- // recovery.
|
|
|
|
- //
|
|
|
|
- if (clientName.length() > 0) {
|
|
|
|
|
|
+ if (Thread.interrupted()) { // shut down if the thread is interrupted
|
|
throw ioe;
|
|
throw ioe;
|
|
|
|
+ } else { // encounter an error while writing to mirror
|
|
|
|
+ // continue to run even if can not write to mirror
|
|
|
|
+ // notify client of the error
|
|
|
|
+ // and wait for the client to shut down the pipeline
|
|
|
|
+ mirrorError = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -396,8 +395,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
|
|
|
setBlockPosition(offsetInBlock);
|
|
setBlockPosition(offsetInBlock);
|
|
|
|
|
|
- //First write the packet to the mirror:
|
|
|
|
- if (mirrorOut != null) {
|
|
|
|
|
|
+ // First write the packet to the mirror:
|
|
|
|
+ if (mirrorOut != null && !mirrorError) {
|
|
try {
|
|
try {
|
|
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
|
|
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
|
|
mirrorOut.flush();
|
|
mirrorOut.flush();
|
|
@@ -515,7 +514,8 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
if (clientName.length() > 0) {
|
|
if (clientName.length() > 0) {
|
|
responder = new Daemon(datanode.threadGroup,
|
|
responder = new Daemon(datanode.threadGroup,
|
|
new PacketResponder(this, block, mirrIn,
|
|
new PacketResponder(this, block, mirrIn,
|
|
- replyOut, numTargets));
|
|
|
|
|
|
+ replyOut, numTargets,
|
|
|
|
+ Thread.currentThread()));
|
|
responder.start(); // start thread to processes reponses
|
|
responder.start(); // start thread to processes reponses
|
|
}
|
|
}
|
|
|
|
|
|
@@ -700,18 +700,21 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
DataOutputStream replyOut; // output to upstream datanode
|
|
DataOutputStream replyOut; // output to upstream datanode
|
|
private int numTargets; // number of downstream datanodes including myself
|
|
private int numTargets; // number of downstream datanodes including myself
|
|
private BlockReceiver receiver; // The owner of this responder.
|
|
private BlockReceiver receiver; // The owner of this responder.
|
|
|
|
+ private Thread receiverThread; // the thread that spawns this responder
|
|
|
|
|
|
public String toString() {
|
|
public String toString() {
|
|
return "PacketResponder " + numTargets + " for Block " + this.block;
|
|
return "PacketResponder " + numTargets + " for Block " + this.block;
|
|
}
|
|
}
|
|
|
|
|
|
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
|
|
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
|
|
- DataOutputStream out, int numTargets) {
|
|
|
|
|
|
+ DataOutputStream out, int numTargets,
|
|
|
|
+ Thread receiverThread) {
|
|
this.receiver = receiver;
|
|
this.receiver = receiver;
|
|
this.block = b;
|
|
this.block = b;
|
|
mirrorIn = in;
|
|
mirrorIn = in;
|
|
replyOut = out;
|
|
replyOut = out;
|
|
this.numTargets = numTargets;
|
|
this.numTargets = numTargets;
|
|
|
|
+ this.receiverThread = receiverThread;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -848,11 +851,10 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
}
|
|
}
|
|
|
|
|
|
boolean lastPacketInBlock = false;
|
|
boolean lastPacketInBlock = false;
|
|
|
|
+ boolean isInterrupted = false;
|
|
while (running && datanode.shouldRun && !lastPacketInBlock) {
|
|
while (running && datanode.shouldRun && !lastPacketInBlock) {
|
|
|
|
|
|
try {
|
|
try {
|
|
- boolean didRead = false;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Sequence number -2 is a special value that is used when
|
|
* Sequence number -2 is a special value that is used when
|
|
* a DN fails to read an ack from a downstream. In this case,
|
|
* a DN fails to read an ack from a downstream. In this case,
|
|
@@ -861,31 +863,20 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
* as an UNKNOWN value.
|
|
* as an UNKNOWN value.
|
|
*/
|
|
*/
|
|
long expected = -2;
|
|
long expected = -2;
|
|
|
|
+ long seqno = -2;
|
|
|
|
|
|
PipelineAck ack = new PipelineAck();
|
|
PipelineAck ack = new PipelineAck();
|
|
try {
|
|
try {
|
|
- // read an ack from downstream datanode
|
|
|
|
- ack.readFields(mirrorIn, numTargets);
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got " + ack);
|
|
|
|
- }
|
|
|
|
- long seqno = ack.getSeqno();
|
|
|
|
- didRead = true;
|
|
|
|
- if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
|
|
|
|
- ack.write(replyOut); // send keepalive
|
|
|
|
- replyOut.flush();
|
|
|
|
- continue;
|
|
|
|
- } else if (seqno == -2) {
|
|
|
|
- // A downstream node must have failed to read an ack. We need
|
|
|
|
- // to forward this on.
|
|
|
|
- assert ! ack.isSuccess();
|
|
|
|
- } else {
|
|
|
|
- if (seqno < 0) {
|
|
|
|
- throw new IOException("Received an invalid negative sequence number. "
|
|
|
|
- + "Ack = " + ack);
|
|
|
|
|
|
+ if (!mirrorError) {
|
|
|
|
+ // read an ack from downstream datanode
|
|
|
|
+ ack.readFields(mirrorIn, numTargets);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("PacketResponder " + numTargets +
|
|
|
|
+ " for block " + block + " got " + ack);
|
|
}
|
|
}
|
|
- assert seqno >= 0;
|
|
|
|
-
|
|
|
|
|
|
+ seqno = ack.getSeqno();
|
|
|
|
+ }
|
|
|
|
+ if (seqno >= 0 || mirrorError) {
|
|
Packet pkt = null;
|
|
Packet pkt = null;
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
@@ -897,10 +888,13 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
}
|
|
}
|
|
wait();
|
|
wait();
|
|
}
|
|
}
|
|
|
|
+ if (!running || !datanode.shouldRun) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
pkt = ackQueue.removeFirst();
|
|
pkt = ackQueue.removeFirst();
|
|
expected = pkt.seqno;
|
|
expected = pkt.seqno;
|
|
notifyAll();
|
|
notifyAll();
|
|
- if (seqno != expected) {
|
|
|
|
|
|
+ if (seqno != expected && !mirrorError) {
|
|
throw new IOException("PacketResponder " + numTargets +
|
|
throw new IOException("PacketResponder " + numTargets +
|
|
" for block " + block +
|
|
" for block " + block +
|
|
" expected seqno:" + expected +
|
|
" expected seqno:" + expected +
|
|
@@ -909,27 +903,32 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
lastPacketInBlock = pkt.lastPacketInBlock;
|
|
lastPacketInBlock = pkt.lastPacketInBlock;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- } catch (Throwable e) {
|
|
|
|
- if (running) {
|
|
|
|
- LOG.info("PacketResponder " + block + " " + numTargets +
|
|
|
|
- " Exception " + StringUtils.stringifyException(e));
|
|
|
|
- running = false;
|
|
|
|
|
|
+ } catch (InterruptedException ine) {
|
|
|
|
+ isInterrupted = true;
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ if (Thread.interrupted()) {
|
|
|
|
+ isInterrupted = true;
|
|
|
|
+ } else {
|
|
|
|
+ // continue to run even if can not read from mirror
|
|
|
|
+ // notify client of the error
|
|
|
|
+ // and wait for the client to shut down the pipeline
|
|
|
|
+ mirrorError = true;
|
|
|
|
+ LOG.info("PacketResponder " + block + " " + numTargets +
|
|
|
|
+ " Exception " + StringUtils.stringifyException(ioe));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if (Thread.interrupted()) {
|
|
|
|
|
|
+ if (Thread.interrupted() || isInterrupted) {
|
|
/* The receiver thread cancelled this thread.
|
|
/* The receiver thread cancelled this thread.
|
|
* We could also check any other status updates from the
|
|
* We could also check any other status updates from the
|
|
* receiver thread (e.g. if it is ok to write to replyOut).
|
|
* receiver thread (e.g. if it is ok to write to replyOut).
|
|
* It is prudent to not send any more status back to the client
|
|
* It is prudent to not send any more status back to the client
|
|
* because this datanode has a problem. The upstream datanode
|
|
* because this datanode has a problem. The upstream datanode
|
|
- * will detect a timout on heartbeats and will declare that
|
|
|
|
- * this datanode is bad, and rightly so.
|
|
|
|
|
|
+ * will detect that this datanode is bad, and rightly so.
|
|
*/
|
|
*/
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
" : Thread is interrupted.");
|
|
" : Thread is interrupted.");
|
|
- running = false;
|
|
|
|
- continue;
|
|
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
|
|
|
|
// If this is the last packet in block, then close block
|
|
// If this is the last packet in block, then close block
|
|
@@ -954,23 +953,25 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // construct my ack message.
|
|
|
|
- short[] replies = new short[1 + numTargets];
|
|
|
|
- if (!didRead) { // no ack is read
|
|
|
|
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
|
- // Fill all downstream nodes with ERROR - the client will
|
|
|
|
- // eject the first node with ERROR status (our mirror)
|
|
|
|
- for (int i = 1; i < replies.length; i++) {
|
|
|
|
- replies[i] = DataTransferProtocol.OP_STATUS_ERROR;
|
|
|
|
- }
|
|
|
|
|
|
+ PipelineAck replyAck;
|
|
|
|
+ if (seqno == PipelineAck.HEART_BEAT.getSeqno()) {
|
|
|
|
+ replyAck = ack; // continue to send keep alive
|
|
} else {
|
|
} else {
|
|
- replies = new short[1+numTargets];
|
|
|
|
- replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
|
- for (int i=0; i<numTargets; i++) {
|
|
|
|
- replies[i+1] = ack.getReply(i);
|
|
|
|
|
|
+ // construct my ack message
|
|
|
|
+ short[] replies = null;
|
|
|
|
+ if (mirrorError) { // no ack is read
|
|
|
|
+ replies = new short[2];
|
|
|
|
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
|
+ replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
|
|
|
|
+ } else {
|
|
|
|
+ replies = new short[1+ack.getNumOfReplies()];
|
|
|
|
+ replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
|
|
|
|
+ for (int i=0; i<ack.getNumOfReplies(); i++) {
|
|
|
|
+ replies[i+1] = ack.getReply(i);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ replyAck = new PipelineAck(expected, replies);
|
|
}
|
|
}
|
|
- PipelineAck replyAck = new PipelineAck(expected, replies);
|
|
|
|
|
|
|
|
// send my ack back to upstream datanode
|
|
// send my ack back to upstream datanode
|
|
replyAck.write(replyOut);
|
|
replyAck.write(replyOut);
|
|
@@ -980,24 +981,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
" for block " + block +
|
|
" for block " + block +
|
|
" responded an ack: " + replyAck);
|
|
" responded an ack: " + replyAck);
|
|
}
|
|
}
|
|
-
|
|
|
|
- // If we forwarded an error response from a downstream datanode
|
|
|
|
- // and we are acting on behalf of a client, then we quit. The
|
|
|
|
- // client will drive the recovery mechanism.
|
|
|
|
- if (!replyAck.isSuccess() && receiver.clientName.length() > 0) {
|
|
|
|
- running = false;
|
|
|
|
- }
|
|
|
|
- } catch (IOException e) {
|
|
|
|
|
|
+ } catch (Throwable e) {
|
|
|
|
+ LOG.warn("IOException in BlockReceiver.run(): ", e);
|
|
if (running) {
|
|
if (running) {
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
" Exception " + StringUtils.stringifyException(e));
|
|
" Exception " + StringUtils.stringifyException(e));
|
|
running = false;
|
|
running = false;
|
|
}
|
|
}
|
|
- } catch (RuntimeException e) {
|
|
|
|
- if (running) {
|
|
|
|
- LOG.info("PacketResponder " + block + " " + numTargets +
|
|
|
|
- " Exception " + StringUtils.stringifyException(e));
|
|
|
|
- running = false;
|
|
|
|
|
|
+ if (!Thread.interrupted()) { // error not caused by interruption
|
|
|
|
+ receiverThread.interrupt();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|