|
@@ -1167,7 +1167,14 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
mirrorIn = null;
|
|
|
IOUtils.closeSocket(mirrorSock);
|
|
|
mirrorSock = null;
|
|
|
- throw e;
|
|
|
+ if (client.length() > 0) {
|
|
|
+ throw e;
|
|
|
+ } else {
|
|
|
+ LOG.info(dnRegistration + ":Exception transfering block " +
|
|
|
+ block + " to mirror " + mirrorNode +
|
|
|
+ ". continuing without the mirror.\n" +
|
|
|
+ StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2220,6 +2227,26 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * While writing to mirrorOut, failure to write to mirror should not
|
|
|
+ * affect this datanode unless a client is writing the block.
|
|
|
+ */
|
|
|
+ private void handleMirrorOutError(IOException ioe) throws IOException {
|
|
|
+ LOG.info(dnRegistration + ":Exception writing block " +
|
|
|
+ block + " to mirror " + mirrorAddr + "\n" +
|
|
|
+ StringUtils.stringifyException(ioe));
|
|
|
+ mirrorOut = null;
|
|
|
+ //
|
|
|
+ // If stream-copy fails, continue
|
|
|
+ // writing to disk for replication requests. For client
|
|
|
+ // writes, return error so that the client can do error
|
|
|
+ // recovery.
|
|
|
+ //
|
|
|
+ if (clientName.length() > 0) {
|
|
|
+ throw ioe;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/* receive a chunk: write it to disk & mirror it to another stream */
|
|
|
private void receiveChunk( int len, byte[] checksumBuf, int checksumOff )
|
|
|
throws IOException {
|
|
@@ -2253,19 +2280,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
mirrorOut.write(checksumBuf, checksumOff, checksumSize);
|
|
|
mirrorOut.write(buf, 0, len);
|
|
|
} catch (IOException ioe) {
|
|
|
- LOG.info(dnRegistration + ":Exception writing block " +
|
|
|
- block + " to mirror " + mirrorAddr + "\n" +
|
|
|
- StringUtils.stringifyException(ioe));
|
|
|
- mirrorOut = null;
|
|
|
- //
|
|
|
- // If stream-copy fails, continue
|
|
|
- // writing to disk for replication requests. For client
|
|
|
- // writes, return error so that the client can do error
|
|
|
- // recovery.
|
|
|
- //
|
|
|
- if (clientName.length() > 0) {
|
|
|
- throw ioe;
|
|
|
- }
|
|
|
+ handleMirrorOutError(ioe);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2339,26 +2354,19 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
mirrorOut.writeLong(seqno);
|
|
|
mirrorOut.writeBoolean(lastPacketInBlock);
|
|
|
} catch (IOException e) {
|
|
|
- LOG.info("Exception writing to mirror " + mirrorAddr + "\n"
|
|
|
- + StringUtils.stringifyException(e));
|
|
|
- mirrorOut = null;
|
|
|
-
|
|
|
- // If stream-copy fails, continue
|
|
|
- // writing to disk for replication requests. For client
|
|
|
- // writes, return error so that the client can do error
|
|
|
- // recovery.
|
|
|
- //
|
|
|
- if (clientName.length() > 0) {
|
|
|
- throw e;
|
|
|
- }
|
|
|
+ handleMirrorOutError(e);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (len == 0) {
|
|
|
LOG.info("Receiving empty packet for block " + block);
|
|
|
if (mirrorOut != null) {
|
|
|
- mirrorOut.writeInt(len);
|
|
|
- mirrorOut.flush();
|
|
|
+ try {
|
|
|
+ mirrorOut.writeInt(len);
|
|
|
+ mirrorOut.flush();
|
|
|
+ } catch (IOException e) {
|
|
|
+ handleMirrorOutError(e);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2397,7 +2405,11 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
if (curPacketSize == packetSize) {
|
|
|
if (mirrorOut != null) {
|
|
|
- mirrorOut.flush();
|
|
|
+ try {
|
|
|
+ mirrorOut.flush();
|
|
|
+ } catch (IOException e) {
|
|
|
+ handleMirrorOutError(e);
|
|
|
+ }
|
|
|
}
|
|
|
break;
|
|
|
}
|
|
@@ -2421,15 +2433,15 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
|
|
|
public void receiveBlock(
|
|
|
- DataOutputStream mirrorOut, // output to next datanode
|
|
|
- DataInputStream mirrorIn, // input from next datanode
|
|
|
+ DataOutputStream mirrOut, // output to next datanode
|
|
|
+ DataInputStream mirrIn, // input from next datanode
|
|
|
DataOutputStream replyOut, // output to previous datanode
|
|
|
- String mirrorAddr, Throttler throttler,
|
|
|
+ String mirrAddr, Throttler throttlerArg,
|
|
|
int numTargets) throws IOException {
|
|
|
|
|
|
- this.mirrorOut = mirrorOut;
|
|
|
- this.mirrorAddr = mirrorAddr;
|
|
|
- this.throttler = throttler;
|
|
|
+ mirrorOut = mirrOut;
|
|
|
+ mirrorAddr = mirrAddr;
|
|
|
+ throttler = throttlerArg;
|
|
|
|
|
|
try {
|
|
|
// write data chunk header
|
|
@@ -2439,7 +2451,7 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
}
|
|
|
if (clientName.length() > 0) {
|
|
|
responder = new Daemon(threadGroup,
|
|
|
- new PacketResponder(this, block, mirrorIn,
|
|
|
+ new PacketResponder(this, block, mirrIn,
|
|
|
replyOut, numTargets,
|
|
|
clientName));
|
|
|
responder.start(); // start thread to processes reponses
|
|
@@ -2456,8 +2468,12 @@ public class DataNode implements FSConstants, Runnable {
|
|
|
|
|
|
// flush the mirror out
|
|
|
if (mirrorOut != null) {
|
|
|
- mirrorOut.writeInt(0); // mark the end of the block
|
|
|
- mirrorOut.flush();
|
|
|
+ try {
|
|
|
+ mirrorOut.writeInt(0); // mark the end of the block
|
|
|
+ mirrorOut.flush();
|
|
|
+ } catch (IOException e) {
|
|
|
+ handleMirrorOutError(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// wait for all outstanding packet responses. And then
|