|
@@ -327,7 +327,8 @@ class BlockReceiver implements Closeable {
|
|
clientChecksum.update(dataBuf, dataOff, chunkLen);
|
|
clientChecksum.update(dataBuf, dataOff, chunkLen);
|
|
|
|
|
|
if (!clientChecksum.compare(checksumBuf, checksumOff)) {
|
|
if (!clientChecksum.compare(checksumBuf, checksumOff)) {
|
|
- if (srcDataNode != null) {
|
|
|
|
|
|
+ // No need to report to namenode when client is writing.
|
|
|
|
+ if (srcDataNode != null && isDatanode) {
|
|
try {
|
|
try {
|
|
LOG.info("report corrupt block " + block + " from datanode " +
|
|
LOG.info("report corrupt block " + block + " from datanode " +
|
|
srcDataNode + " to namenode");
|
|
srcDataNode + " to namenode");
|
|
@@ -503,6 +504,19 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /** Check whether checksum needs to be verified.
|
|
|
|
+ * Skip verifying checksum iff this is not the last one in the
|
|
|
|
+ * pipeline and clientName is non-null. i.e. Checksum is verified
|
|
|
|
+ * on all the datanodes when the data is being written by a
|
|
|
|
+ * datanode rather than a client. Whe client is writing the data,
|
|
|
|
+ * protocol includes acks and only the last datanode needs to verify
|
|
|
|
+ * checksum.
|
|
|
|
+ * @return true if checksum verification is needed, otherwise false.
|
|
|
|
+ */
|
|
|
|
+ private boolean shouldVerifyChecksum() {
|
|
|
|
+ return (mirrorOut == null || isDatanode || needsChecksumTranslation);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Receives and processes a packet. It can contain many chunks.
|
|
* Receives and processes a packet. It can contain many chunks.
|
|
* returns the number of data bytes that the packet has.
|
|
* returns the number of data bytes that the packet has.
|
|
@@ -567,9 +581,9 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
// put in queue for pending acks
|
|
// put in queue for pending acks
|
|
- if (responder != null) {
|
|
|
|
|
|
+ if (responder != null && !shouldVerifyChecksum()) {
|
|
((PacketResponder)responder.getRunnable()).enqueue(seqno,
|
|
((PacketResponder)responder.getRunnable()).enqueue(seqno,
|
|
- lastPacketInBlock, offsetInBlock);
|
|
|
|
|
|
+ lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
|
}
|
|
}
|
|
|
|
|
|
//First write the packet to the mirror:
|
|
//First write the packet to the mirror:
|
|
@@ -605,15 +619,24 @@ class BlockReceiver implements Closeable {
|
|
|
|
|
|
buf.position(buf.limit()); // move to the end of the data.
|
|
buf.position(buf.limit()); // move to the end of the data.
|
|
|
|
|
|
- /* skip verifying checksum iff this is not the last one in the
|
|
|
|
- * pipeline and clientName is non-null. i.e. Checksum is verified
|
|
|
|
- * on all the datanodes when the data is being written by a
|
|
|
|
- * datanode rather than a client. Whe client is writing the data,
|
|
|
|
- * protocol includes acks and only the last datanode needs to verify
|
|
|
|
- * checksum.
|
|
|
|
- */
|
|
|
|
- if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
|
|
|
|
- verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
|
|
|
|
|
|
+ if (shouldVerifyChecksum()) {
|
|
|
|
+ try {
|
|
|
|
+ verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ // checksum error detected locally. there is no reason to continue.
|
|
|
|
+ if (responder != null) {
|
|
|
|
+ try {
|
|
|
|
+ ((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
|
|
|
+ lastPacketInBlock, offsetInBlock,
|
|
|
|
+ Status.ERROR_CHECKSUM);
|
|
|
|
+ // Wait until the responder sends back the response
|
|
|
|
+ // and interrupt this thread.
|
|
|
|
+ Thread.sleep(3000);
|
|
|
|
+ } catch (InterruptedException ie) { }
|
|
|
|
+ }
|
|
|
|
+ throw new IOException("Terminating due to a checksum error." + ioe);
|
|
|
|
+ }
|
|
|
|
+
|
|
if (needsChecksumTranslation) {
|
|
if (needsChecksumTranslation) {
|
|
// overwrite the checksums in the packet buffer with the
|
|
// overwrite the checksums in the packet buffer with the
|
|
// appropriate polynomial for the disk storage.
|
|
// appropriate polynomial for the disk storage.
|
|
@@ -695,6 +718,11 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ if (responder != null && shouldVerifyChecksum()) {
|
|
|
|
+ ((PacketResponder) responder.getRunnable()).enqueue(seqno,
|
|
|
|
+ lastPacketInBlock, offsetInBlock, Status.SUCCESS);
|
|
|
|
+ }
|
|
|
|
+
|
|
if (throttler != null) { // throttle I/O
|
|
if (throttler != null) { // throttle I/O
|
|
throttler.throttle(len);
|
|
throttler.throttle(len);
|
|
}
|
|
}
|
|
@@ -890,7 +918,7 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Processed responses from downstream datanodes in the pipeline
|
|
|
|
|
|
+ * Processes responses from downstream datanodes in the pipeline
|
|
* and sends back replies to the originator.
|
|
* and sends back replies to the originator.
|
|
*/
|
|
*/
|
|
class PacketResponder implements Runnable, Closeable {
|
|
class PacketResponder implements Runnable, Closeable {
|
|
@@ -943,9 +971,11 @@ class BlockReceiver implements Closeable {
|
|
* @param offsetInBlock
|
|
* @param offsetInBlock
|
|
*/
|
|
*/
|
|
synchronized void enqueue(final long seqno,
|
|
synchronized void enqueue(final long seqno,
|
|
- final boolean lastPacketInBlock, final long offsetInBlock) {
|
|
|
|
|
|
+ final boolean lastPacketInBlock,
|
|
|
|
+ final long offsetInBlock, final Status ackStatus) {
|
|
if (running) {
|
|
if (running) {
|
|
- final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
|
|
|
|
|
|
+ final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock,
|
|
|
|
+ ackStatus);
|
|
if(LOG.isDebugEnabled()) {
|
|
if(LOG.isDebugEnabled()) {
|
|
LOG.debug(myString + ": enqueue " + p);
|
|
LOG.debug(myString + ": enqueue " + p);
|
|
}
|
|
}
|
|
@@ -1071,20 +1101,31 @@ class BlockReceiver implements Closeable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ Status myStatus = pkt == null ? Status.SUCCESS : pkt.ackStatus;
|
|
// construct my ack message
|
|
// construct my ack message
|
|
Status[] replies = null;
|
|
Status[] replies = null;
|
|
if (mirrorError) { // ack read error
|
|
if (mirrorError) { // ack read error
|
|
replies = new Status[2];
|
|
replies = new Status[2];
|
|
- replies[0] = Status.SUCCESS;
|
|
|
|
|
|
+ replies[0] = myStatus;
|
|
replies[1] = Status.ERROR;
|
|
replies[1] = Status.ERROR;
|
|
} else {
|
|
} else {
|
|
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
|
|
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
|
|
: ack.getNumOfReplies();
|
|
: ack.getNumOfReplies();
|
|
replies = new Status[1+ackLen];
|
|
replies = new Status[1+ackLen];
|
|
- replies[0] = Status.SUCCESS;
|
|
|
|
|
|
+ replies[0] = myStatus;
|
|
for (int i=0; i<ackLen; i++) {
|
|
for (int i=0; i<ackLen; i++) {
|
|
replies[i+1] = ack.getReply(i);
|
|
replies[i+1] = ack.getReply(i);
|
|
}
|
|
}
|
|
|
|
+ // If the mirror has reported that it received a corrupt packet,
|
|
|
|
+ // do self-destruct to mark myself bad, instead of the mirror node.
|
|
|
|
+ // The mirror is guaranteed to be good without corrupt data.
|
|
|
|
+ if (ackLen > 0 && replies[1] == Status.ERROR_CHECKSUM) {
|
|
|
|
+ running = false;
|
|
|
|
+ removeAckHead();
|
|
|
|
+ LOG.warn("Shutting down writer and responder due to a checksum error.");
|
|
|
|
+ receiverThread.interrupt();
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
PipelineAck replyAck = new PipelineAck(expected, replies);
|
|
PipelineAck replyAck = new PipelineAck(expected, replies);
|
|
|
|
|
|
@@ -1103,6 +1144,14 @@ class BlockReceiver implements Closeable {
|
|
removeAckHead();
|
|
removeAckHead();
|
|
// update bytes acked
|
|
// update bytes acked
|
|
}
|
|
}
|
|
|
|
+ // terminate after sending response if this node detected
|
|
|
|
+ // a checksum error
|
|
|
|
+ if (myStatus == Status.ERROR_CHECKSUM) {
|
|
|
|
+ running = false;
|
|
|
|
+ LOG.warn("Shutting down writer and responder due to a checksum error.");
|
|
|
|
+ receiverThread.interrupt();
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn("IOException in BlockReceiver.run(): ", e);
|
|
LOG.warn("IOException in BlockReceiver.run(): ", e);
|
|
if (running) {
|
|
if (running) {
|
|
@@ -1146,11 +1195,14 @@ class BlockReceiver implements Closeable {
|
|
final long seqno;
|
|
final long seqno;
|
|
final boolean lastPacketInBlock;
|
|
final boolean lastPacketInBlock;
|
|
final long offsetInBlock;
|
|
final long offsetInBlock;
|
|
|
|
+ final Status ackStatus;
|
|
|
|
|
|
- Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
|
|
|
|
|
|
+ Packet(long seqno, boolean lastPacketInBlock,
|
|
|
|
+ long offsetInBlock, Status ackStatus) {
|
|
this.seqno = seqno;
|
|
this.seqno = seqno;
|
|
this.lastPacketInBlock = lastPacketInBlock;
|
|
this.lastPacketInBlock = lastPacketInBlock;
|
|
this.offsetInBlock = offsetInBlock;
|
|
this.offsetInBlock = offsetInBlock;
|
|
|
|
+ this.ackStatus = ackStatus;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -1158,6 +1210,7 @@ class BlockReceiver implements Closeable {
|
|
return getClass().getSimpleName() + "(seqno=" + seqno
|
|
return getClass().getSimpleName() + "(seqno=" + seqno
|
|
+ ", lastPacketInBlock=" + lastPacketInBlock
|
|
+ ", lastPacketInBlock=" + lastPacketInBlock
|
|
+ ", offsetInBlock=" + offsetInBlock
|
|
+ ", offsetInBlock=" + offsetInBlock
|
|
|
|
+ + ", ackStatus=" + ackStatus
|
|
+ ")";
|
|
+ ")";
|
|
}
|
|
}
|
|
}
|
|
}
|