|
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.ClosedChannelException;
|
|
|
import java.util.EnumSet;
|
|
|
+import java.util.Random;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
@@ -126,6 +127,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
|
|
private FileEncryptionInfo fileEncryptionInfo;
|
|
|
private int writePacketSize;
|
|
|
+ private boolean leaseRecovered = false;
|
|
|
+ private boolean exceptionInClose = false; //for unit test
|
|
|
|
|
|
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
|
|
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
|
|
@@ -832,6 +835,39 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public void setExceptionInClose(boolean enable) {
|
|
|
+ exceptionInClose = enable;
|
|
|
+ }
|
|
|
+
|
|
|
+ private class EmulateExceptionInClose {
|
|
|
+ private Random rand = null;
|
|
|
+ private int kickedNum;
|
|
|
+
|
|
|
+ EmulateExceptionInClose(int callNum) {
|
|
|
+ if (exceptionInClose) {
|
|
|
+ rand = new Random();
|
|
|
+ }
|
|
|
+ kickedNum = callNum;
|
|
|
+ }
|
|
|
+
|
|
|
+ void kickRandomException() throws IOException {
|
|
|
+ if (exceptionInClose) {
|
|
|
+ if (kickedNum > 0) {
|
|
|
+ if (rand.nextInt(kickedNum) == 1) {
|
|
|
+ throw new IOException("Emulated random IOException in close");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void kickException() throws IOException {
|
|
|
+ if (exceptionInClose) {
|
|
|
+ throw new IOException("Emulated IOException in close");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Closes this output stream and releases any system
|
|
|
* resources associated with this stream.
|
|
@@ -854,7 +890,20 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
protected synchronized void closeImpl() throws IOException {
|
|
|
+ boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean(
|
|
|
+ HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY,
|
|
|
+ HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT);
|
|
|
if (isClosed()) {
|
|
|
+ if (recoverOnCloseException && !leaseRecovered) {
|
|
|
+ try {
|
|
|
+ dfsClient.endFileLease(fileId);
|
|
|
+ dfsClient.recoverLease(src);
|
|
|
+ leaseRecovered = true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Fail to recover lease for {}", src, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
|
|
|
closed, getStreamer().streamerClosed());
|
|
|
try {
|
|
@@ -871,8 +920,11 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ EmulateExceptionInClose eei = new EmulateExceptionInClose(5);
|
|
|
try {
|
|
|
- flushBuffer(); // flush from all upper layers
|
|
|
+ flushBuffer(); // flush from all upper layers
|
|
|
+ // for test
|
|
|
+ eei.kickRandomException();
|
|
|
|
|
|
if (currentPacket != null) {
|
|
|
enqueueCurrentPacket();
|
|
@@ -883,12 +935,28 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- flushInternal(); // flush all data to Datanodes
|
|
|
+ flushInternal(); // flush all data to Datanodes
|
|
|
} catch (IOException ioe) {
|
|
|
cleanupAndRethrowIOException(ioe);
|
|
|
}
|
|
|
+ // for test
|
|
|
+ eei.kickRandomException();
|
|
|
completeFile();
|
|
|
+ // for test
|
|
|
+ eei.kickException();
|
|
|
} catch (ClosedChannelException ignored) {
|
|
|
+ } catch (IOException ioe) {
|
|
|
+ if (recoverOnCloseException) {
|
|
|
+ try {
|
|
|
+ dfsClient.endFileLease(fileId);
|
|
|
+ dfsClient.recoverLease(src);
|
|
|
+ leaseRecovered = true;
|
|
|
+ } catch (Exception e) {
|
|
|
+ // Ignore exception rendered by recoverLease. Throw original
|
|
|
+ // exception
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw ioe;
|
|
|
} finally {
|
|
|
// Failures may happen when flushing data.
|
|
|
// Streamers may keep waiting for the new block information.
|