|
@@ -35,11 +35,11 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.hadoop.fs.FSInputChecker;
|
|
import org.apache.hadoop.fs.FSInputChecker;
|
|
import org.apache.hadoop.fs.FSOutputSummer;
|
|
import org.apache.hadoop.fs.FSOutputSummer;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
import org.apache.hadoop.hdfs.protocol.Block;
|
|
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
|
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
|
|
|
|
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
|
|
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.io.IOUtils;
|
|
import org.apache.hadoop.util.Daemon;
|
|
import org.apache.hadoop.util.Daemon;
|
|
@@ -77,6 +77,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
private Checksum partialCrc = null;
|
|
private Checksum partialCrc = null;
|
|
private final DataNode datanode;
|
|
private final DataNode datanode;
|
|
final private ReplicaInPipelineInterface replicaInfo;
|
|
final private ReplicaInPipelineInterface replicaInfo;
|
|
|
|
+ volatile private boolean mirrorError;
|
|
|
|
|
|
BlockReceiver(Block block, DataInputStream in, String inAddr,
|
|
BlockReceiver(Block block, DataInputStream in, String inAddr,
|
|
String myAddr, BlockConstructionStage stage,
|
|
String myAddr, BlockConstructionStage stage,
|
|
@@ -217,21 +218,19 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
|
|
|
/**
|
|
/**
|
|
* While writing to mirrorOut, failure to write to mirror should not
|
|
* While writing to mirrorOut, failure to write to mirror should not
|
|
- * affect this datanode unless a client is writing the block.
|
|
|
|
|
|
+ * affect this datanode unless it is caused by interruption.
|
|
*/
|
|
*/
|
|
private void handleMirrorOutError(IOException ioe) throws IOException {
|
|
private void handleMirrorOutError(IOException ioe) throws IOException {
|
|
LOG.info(datanode.dnRegistration + ":Exception writing block " +
|
|
LOG.info(datanode.dnRegistration + ":Exception writing block " +
|
|
block + " to mirror " + mirrorAddr + "\n" +
|
|
block + " to mirror " + mirrorAddr + "\n" +
|
|
StringUtils.stringifyException(ioe));
|
|
StringUtils.stringifyException(ioe));
|
|
- mirrorOut = null;
|
|
|
|
- //
|
|
|
|
- // If stream-copy fails, continue
|
|
|
|
- // writing to disk for replication requests. For client
|
|
|
|
- // writes, return error so that the client can do error
|
|
|
|
- // recovery.
|
|
|
|
- //
|
|
|
|
- if (clientName.length() > 0) {
|
|
|
|
|
|
+ if (Thread.interrupted()) { // shut down if the thread is interrupted
|
|
throw ioe;
|
|
throw ioe;
|
|
|
|
+ } else { // encounter an error while writing to mirror
|
|
|
|
+ // continue to run even if can not write to mirror
|
|
|
|
+ // notify client of the error
|
|
|
|
+ // and wait for the client to shut down the pipeline
|
|
|
|
+ mirrorError = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -433,6 +432,14 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
|
|
return receivePacket(offsetInBlock, seqno, lastPacketInBlock, len, endOfHeader);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Write the received packet to disk (data only)
|
|
|
|
+ */
|
|
|
|
+ private void writePacketToDisk(byte[] pktBuf, int startByteToDisk,
|
|
|
|
+ int numBytesToDisk) throws IOException {
|
|
|
|
+ out.write(pktBuf, startByteToDisk, numBytesToDisk);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Receives and processes a packet. It can contain many chunks.
|
|
* Receives and processes a packet. It can contain many chunks.
|
|
* returns the number of data bytes that the packet has.
|
|
* returns the number of data bytes that the packet has.
|
|
@@ -461,7 +468,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
}
|
|
}
|
|
|
|
|
|
//First write the packet to the mirror:
|
|
//First write the packet to the mirror:
|
|
- if (mirrorOut != null) {
|
|
|
|
|
|
+ if (mirrorOut != null && !mirrorError) {
|
|
try {
|
|
try {
|
|
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
|
|
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
|
|
mirrorOut.flush();
|
|
mirrorOut.flush();
|
|
@@ -469,7 +476,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
handleMirrorOutError(e);
|
|
handleMirrorOutError(e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
buf.position(endOfHeader);
|
|
buf.position(endOfHeader);
|
|
|
|
|
|
if (lastPacketInBlock || len == 0) {
|
|
if (lastPacketInBlock || len == 0) {
|
|
@@ -525,7 +532,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
|
|
|
|
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
|
|
int startByteToDisk = dataOff+(int)(onDiskLen-firstByteInBlock);
|
|
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
|
|
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
|
|
- out.write(pktBuf, startByteToDisk, numBytesToDisk);
|
|
|
|
|
|
+ writePacketToDisk(pktBuf, startByteToDisk, numBytesToDisk);
|
|
|
|
|
|
// If this is a partial chunk, then verify that this is the only
|
|
// If this is a partial chunk, then verify that this is the only
|
|
// chunk in the packet. Calculate new crc for this chunk.
|
|
// chunk in the packet. Calculate new crc for this chunk.
|
|
@@ -560,7 +567,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
throttler.throttle(len);
|
|
throttler.throttle(len);
|
|
}
|
|
}
|
|
|
|
|
|
- return len;
|
|
|
|
|
|
+ return lastPacketInBlock?-1:len;
|
|
}
|
|
}
|
|
|
|
|
|
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
|
|
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
|
|
@@ -584,14 +591,15 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
if (clientName.length() > 0) {
|
|
if (clientName.length() > 0) {
|
|
responder = new Daemon(datanode.threadGroup,
|
|
responder = new Daemon(datanode.threadGroup,
|
|
new PacketResponder(this, block, mirrIn,
|
|
new PacketResponder(this, block, mirrIn,
|
|
- replyOut, numTargets));
|
|
|
|
|
|
+ replyOut, numTargets,
|
|
|
|
+ Thread.currentThread()));
|
|
responder.start(); // start thread to processes reponses
|
|
responder.start(); // start thread to processes reponses
|
|
}
|
|
}
|
|
|
|
|
|
/*
|
|
/*
|
|
- * Receive until packet has zero bytes of data.
|
|
|
|
|
|
+ * Receive until the last packet.
|
|
*/
|
|
*/
|
|
- while (receivePacket() > 0) {}
|
|
|
|
|
|
+ while (receivePacket() >= 0) {}
|
|
|
|
|
|
// wait for all outstanding packet responses. And then
|
|
// wait for all outstanding packet responses. And then
|
|
// indicate responder to gracefully shutdown.
|
|
// indicate responder to gracefully shutdown.
|
|
@@ -729,13 +737,16 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
DataOutputStream replyOut; // output to upstream datanode
|
|
DataOutputStream replyOut; // output to upstream datanode
|
|
private int numTargets; // number of downstream datanodes including myself
|
|
private int numTargets; // number of downstream datanodes including myself
|
|
private BlockReceiver receiver; // The owner of this responder.
|
|
private BlockReceiver receiver; // The owner of this responder.
|
|
|
|
+ private Thread receiverThread; // the thread that spawns this responder
|
|
|
|
|
|
public String toString() {
|
|
public String toString() {
|
|
return "PacketResponder " + numTargets + " for Block " + this.block;
|
|
return "PacketResponder " + numTargets + " for Block " + this.block;
|
|
}
|
|
}
|
|
|
|
|
|
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
|
|
PacketResponder(BlockReceiver receiver, Block b, DataInputStream in,
|
|
- DataOutputStream out, int numTargets) {
|
|
|
|
|
|
+ DataOutputStream out, int numTargets,
|
|
|
|
+ Thread receiverThread) {
|
|
|
|
+ this.receiverThread = receiverThread;
|
|
this.receiver = receiver;
|
|
this.receiver = receiver;
|
|
this.block = b;
|
|
this.block = b;
|
|
mirrorIn = in;
|
|
mirrorIn = in;
|
|
@@ -775,145 +786,31 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
notifyAll();
|
|
notifyAll();
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void lastDataNodeRun() {
|
|
|
|
- long lastHeartbeat = System.currentTimeMillis();
|
|
|
|
- boolean lastPacket = false;
|
|
|
|
- final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
|
|
-
|
|
|
|
- while (running && datanode.shouldRun && !lastPacket) {
|
|
|
|
- long now = System.currentTimeMillis();
|
|
|
|
- try {
|
|
|
|
-
|
|
|
|
- // wait for a packet to be sent to downstream datanode
|
|
|
|
- while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
|
|
- long idle = now - lastHeartbeat;
|
|
|
|
- long timeout = (datanode.socketTimeout/2) - idle;
|
|
|
|
- if (timeout <= 0) {
|
|
|
|
- timeout = 1000;
|
|
|
|
- }
|
|
|
|
- try {
|
|
|
|
- wait(timeout);
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- if (running) {
|
|
|
|
- LOG.info("PacketResponder " + numTargets +
|
|
|
|
- " for block " + block + " Interrupted.");
|
|
|
|
- running = false;
|
|
|
|
- }
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // send a heartbeat if it is time.
|
|
|
|
- now = System.currentTimeMillis();
|
|
|
|
- if (now - lastHeartbeat > datanode.socketTimeout/2) {
|
|
|
|
- replyOut.writeLong(-1); // send heartbeat
|
|
|
|
- replyOut.flush();
|
|
|
|
- lastHeartbeat = now;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!running || !datanode.shouldRun) {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- Packet pkt = ackQueue.getFirst();
|
|
|
|
- long expected = pkt.seqno;
|
|
|
|
- LOG.debug("PacketResponder " + numTargets +
|
|
|
|
- " for block " + block +
|
|
|
|
- " acking for packet " + expected);
|
|
|
|
-
|
|
|
|
- // If this is the last packet in block, then close block
|
|
|
|
- // file and finalize the block before responding success
|
|
|
|
- if (pkt.lastPacketInBlock) {
|
|
|
|
- receiver.close();
|
|
|
|
- final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
|
|
- block.setNumBytes(replicaInfo.getNumBytes());
|
|
|
|
- datanode.data.finalizeBlock(block);
|
|
|
|
- datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
|
|
|
|
- if (ClientTraceLog.isInfoEnabled() &&
|
|
|
|
- receiver.clientName.length() > 0) {
|
|
|
|
- long offset = 0;
|
|
|
|
- ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
|
|
|
|
- receiver.inAddr, receiver.myAddr, block.getNumBytes(),
|
|
|
|
- "HDFS_WRITE", receiver.clientName, offset,
|
|
|
|
- datanode.dnRegistration.getStorageID(), block, endTime-startTime));
|
|
|
|
- } else {
|
|
|
|
- LOG.info("Received block " + block +
|
|
|
|
- " of size " + block.getNumBytes() +
|
|
|
|
- " from " + receiver.inAddr);
|
|
|
|
- }
|
|
|
|
- lastPacket = true;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ackReply(expected);
|
|
|
|
- replyOut.flush();
|
|
|
|
- // remove the packet from the ack queue
|
|
|
|
- removeAckHead();
|
|
|
|
- // update the bytes acked
|
|
|
|
- if (pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
|
|
|
|
- replicaInfo.setBytesAcked(pkt.lastByteInBlock);
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
|
|
|
|
- if (running) {
|
|
|
|
- try {
|
|
|
|
- datanode.checkDiskError(e); // may throw an exception here
|
|
|
|
- } catch (IOException ioe) {
|
|
|
|
- LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
|
|
|
|
- ioe);
|
|
|
|
- }
|
|
|
|
- LOG.info("PacketResponder " + block + " " + numTargets +
|
|
|
|
- " Exception " + StringUtils.stringifyException(e));
|
|
|
|
- running = false;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- LOG.info("PacketResponder " + numTargets +
|
|
|
|
- " for block " + block + " terminating");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // This method is introduced to facilitate testing. Otherwise
|
|
|
|
- // there was a little chance to bind an AspectJ advice to such a sequence
|
|
|
|
- // of calls
|
|
|
|
- private void ackReply(long expected) throws IOException {
|
|
|
|
- replyOut.writeLong(expected);
|
|
|
|
- SUCCESS.write(replyOut);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Thread to process incoming acks.
|
|
* Thread to process incoming acks.
|
|
* @see java.lang.Runnable#run()
|
|
* @see java.lang.Runnable#run()
|
|
*/
|
|
*/
|
|
public void run() {
|
|
public void run() {
|
|
-
|
|
|
|
- // If this is the last datanode in pipeline, then handle differently
|
|
|
|
- if (numTargets == 0) {
|
|
|
|
- lastDataNodeRun();
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
boolean lastPacketInBlock = false;
|
|
boolean lastPacketInBlock = false;
|
|
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
|
|
while (running && datanode.shouldRun && !lastPacketInBlock) {
|
|
while (running && datanode.shouldRun && !lastPacketInBlock) {
|
|
|
|
|
|
boolean isInterrupted = false;
|
|
boolean isInterrupted = false;
|
|
try {
|
|
try {
|
|
- DataTransferProtocol.Status op = SUCCESS;
|
|
|
|
- boolean didRead = false;
|
|
|
|
Packet pkt = null;
|
|
Packet pkt = null;
|
|
long expected = -2;
|
|
long expected = -2;
|
|
- try {
|
|
|
|
- // read seqno from downstream datanode
|
|
|
|
- long seqno = mirrorIn.readLong();
|
|
|
|
- didRead = true;
|
|
|
|
- if (seqno == -1) {
|
|
|
|
- replyOut.writeLong(-1); // send keepalive
|
|
|
|
- replyOut.flush();
|
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got -1");
|
|
|
|
- continue;
|
|
|
|
- } else if (seqno == -2) {
|
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got -2");
|
|
|
|
- } else {
|
|
|
|
- LOG.debug("PacketResponder " + numTargets + " got seqno = " +
|
|
|
|
- seqno);
|
|
|
|
|
|
+ PipelineAck ack = new PipelineAck();
|
|
|
|
+ long seqno = PipelineAck.UNKOWN_SEQNO;
|
|
|
|
+ try {
|
|
|
|
+ if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
|
|
|
|
+ // read an ack from downstream datanode
|
|
|
|
+ ack.readFields(mirrorIn);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("PacketResponder " + numTargets + " got " + ack);
|
|
|
|
+ }
|
|
|
|
+ seqno = ack.getSeqno();
|
|
|
|
+ }
|
|
|
|
+ if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
while (running && datanode.shouldRun && ackQueue.size() == 0) {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -922,17 +819,14 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
" for block " + block +
|
|
" for block " + block +
|
|
" waiting for local datanode to finish write.");
|
|
" waiting for local datanode to finish write.");
|
|
}
|
|
}
|
|
- try {
|
|
|
|
- wait();
|
|
|
|
- } catch (InterruptedException e) {
|
|
|
|
- isInterrupted = true;
|
|
|
|
- throw e;
|
|
|
|
- }
|
|
|
|
|
|
+ wait();
|
|
|
|
+ }
|
|
|
|
+ if (!running || !datanode.shouldRun) {
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
pkt = ackQueue.getFirst();
|
|
pkt = ackQueue.getFirst();
|
|
expected = pkt.seqno;
|
|
expected = pkt.seqno;
|
|
- LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
|
|
|
|
- if (seqno != expected) {
|
|
|
|
|
|
+ if (numTargets > 0 && seqno != expected) {
|
|
throw new IOException("PacketResponder " + numTargets +
|
|
throw new IOException("PacketResponder " + numTargets +
|
|
" for block " + block +
|
|
" for block " + block +
|
|
" expected seqno:" + expected +
|
|
" expected seqno:" + expected +
|
|
@@ -941,11 +835,18 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
lastPacketInBlock = pkt.lastPacketInBlock;
|
|
lastPacketInBlock = pkt.lastPacketInBlock;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- } catch (Throwable e) {
|
|
|
|
- if (running) {
|
|
|
|
|
|
+ } catch (InterruptedException ine) {
|
|
|
|
+ isInterrupted = true;
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ if (Thread.interrupted()) {
|
|
|
|
+ isInterrupted = true;
|
|
|
|
+ } else {
|
|
|
|
+ // continue to run even if can not read from mirror
|
|
|
|
+ // notify client of the error
|
|
|
|
+ // and wait for the client to shut down the pipeline
|
|
|
|
+ mirrorError = true;
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
- " Exception " + StringUtils.stringifyException(e));
|
|
|
|
- running = false;
|
|
|
|
|
|
+ " Exception " + StringUtils.stringifyException(ioe));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -955,8 +856,7 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
* receiver thread (e.g. if it is ok to write to replyOut).
|
|
* receiver thread (e.g. if it is ok to write to replyOut).
|
|
* It is prudent to not send any more status back to the client
|
|
* It is prudent to not send any more status back to the client
|
|
* because this datanode has a problem. The upstream datanode
|
|
* because this datanode has a problem. The upstream datanode
|
|
- * will detect a timout on heartbeats and will declare that
|
|
|
|
- * this datanode is bad, and rightly so.
|
|
|
|
|
|
+ * will detect that this datanode is bad, and rightly so.
|
|
*/
|
|
*/
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
" : Thread is interrupted.");
|
|
" : Thread is interrupted.");
|
|
@@ -964,10 +864,6 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
- if (!didRead) {
|
|
|
|
- op = ERROR;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
// If this is the last packet in block, then close block
|
|
// If this is the last packet in block, then close block
|
|
// file and finalize the block before responding success
|
|
// file and finalize the block before responding success
|
|
if (lastPacketInBlock) {
|
|
if (lastPacketInBlock) {
|
|
@@ -990,56 +886,39 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // send my status back to upstream datanode
|
|
|
|
- ackReply(expected);
|
|
|
|
-
|
|
|
|
- LOG.debug("PacketResponder " + numTargets +
|
|
|
|
- " for block " + block +
|
|
|
|
- " responded my status " +
|
|
|
|
- " for seqno " + expected);
|
|
|
|
-
|
|
|
|
- boolean success = true;
|
|
|
|
- // forward responses from downstream datanodes.
|
|
|
|
- for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
|
|
|
|
- try {
|
|
|
|
- if (op == SUCCESS) {
|
|
|
|
- op = Status.read(mirrorIn);
|
|
|
|
- if (op != SUCCESS) {
|
|
|
|
- success = false;
|
|
|
|
- LOG.debug("PacketResponder for block " + block +
|
|
|
|
- ": error code received from downstream " +
|
|
|
|
- " datanode[" + i + "] " + op);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- } catch (Throwable e) {
|
|
|
|
- op = ERROR;
|
|
|
|
- success = false;
|
|
|
|
|
|
+ // construct my ack message
|
|
|
|
+ Status[] replies = null;
|
|
|
|
+ if (mirrorError) { // ack read error
|
|
|
|
+ replies = new Status[2];
|
|
|
|
+ replies[0] = SUCCESS;
|
|
|
|
+ replies[1] = ERROR;
|
|
|
|
+ } else {
|
|
|
|
+ short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
|
|
|
|
+ replies = new Status[1+ackLen];
|
|
|
|
+ replies[0] = SUCCESS;
|
|
|
|
+ for (int i=0; i<ackLen; i++) {
|
|
|
|
+ replies[i+1] = ack.getReply(i);
|
|
}
|
|
}
|
|
- op.write(replyOut);
|
|
|
|
}
|
|
}
|
|
- replyOut.flush();
|
|
|
|
|
|
+ PipelineAck replyAck = new PipelineAck(expected, replies);
|
|
|
|
|
|
- LOG.debug("PacketResponder " + block + " " + numTargets +
|
|
|
|
- " responded other status " + " for seqno " + expected);
|
|
|
|
-
|
|
|
|
|
|
+ // send my ack back to upstream datanode
|
|
|
|
+ replyAck.write(replyOut);
|
|
|
|
+ replyOut.flush();
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("PacketResponder " + numTargets +
|
|
|
|
+ " for block " + block +
|
|
|
|
+ " responded an ack: " + replyAck);
|
|
|
|
+ }
|
|
if (pkt != null) {
|
|
if (pkt != null) {
|
|
// remove the packet from the ack queue
|
|
// remove the packet from the ack queue
|
|
removeAckHead();
|
|
removeAckHead();
|
|
// update bytes acked
|
|
// update bytes acked
|
|
- if (success && pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
|
|
|
|
|
|
+ if (replyAck.isSuccess() &&
|
|
|
|
+ pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
|
|
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
|
|
replicaInfo.setBytesAcked(pkt.lastByteInBlock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // If we were unable to read the seqno from downstream, then stop.
|
|
|
|
- if (expected == -2) {
|
|
|
|
- running = false;
|
|
|
|
- }
|
|
|
|
- // If we forwarded an error response from a downstream datanode
|
|
|
|
- // and we are acting on behalf of a client, then we quit. The
|
|
|
|
- // client will drive the recovery mechanism.
|
|
|
|
- if (op == ERROR && receiver.clientName.length() > 0) {
|
|
|
|
- running = false;
|
|
|
|
- }
|
|
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn("IOException in BlockReceiver.run(): ", e);
|
|
LOG.warn("IOException in BlockReceiver.run(): ", e);
|
|
if (running) {
|
|
if (running) {
|
|
@@ -1051,12 +930,16 @@ class BlockReceiver implements java.io.Closeable, FSConstants {
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
" Exception " + StringUtils.stringifyException(e));
|
|
" Exception " + StringUtils.stringifyException(e));
|
|
running = false;
|
|
running = false;
|
|
|
|
+ if (!Thread.interrupted()) { // failure not caused by interruption
|
|
|
|
+ receiverThread.interrupt();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- } catch (RuntimeException e) {
|
|
|
|
|
|
+ } catch (Throwable e) {
|
|
if (running) {
|
|
if (running) {
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
LOG.info("PacketResponder " + block + " " + numTargets +
|
|
" Exception " + StringUtils.stringifyException(e));
|
|
" Exception " + StringUtils.stringifyException(e));
|
|
running = false;
|
|
running = false;
|
|
|
|
+ receiverThread.interrupt();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|