|
@@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.ClosedChannelException;
|
|
|
import java.util.EnumSet;
|
|
|
-import java.util.Random;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
@@ -127,8 +126,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
|
|
private FileEncryptionInfo fileEncryptionInfo;
|
|
|
private int writePacketSize;
|
|
|
- private boolean leaseRecovered = false;
|
|
|
- private boolean exceptionInClose = false; //for unit test
|
|
|
|
|
|
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
|
|
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
|
|
@@ -835,39 +832,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @VisibleForTesting
|
|
|
- public void setExceptionInClose(boolean enable) {
|
|
|
- exceptionInClose = enable;
|
|
|
- }
|
|
|
-
|
|
|
- private class EmulateExceptionInClose {
|
|
|
- private Random rand = null;
|
|
|
- private int kickedNum;
|
|
|
-
|
|
|
- EmulateExceptionInClose(int callNum) {
|
|
|
- if (exceptionInClose) {
|
|
|
- rand = new Random();
|
|
|
- }
|
|
|
- kickedNum = callNum;
|
|
|
- }
|
|
|
-
|
|
|
- void kickRandomException() throws IOException {
|
|
|
- if (exceptionInClose) {
|
|
|
- if (kickedNum > 0) {
|
|
|
- if (rand.nextInt(kickedNum) == 1) {
|
|
|
- throw new IOException("Emulated random IOException in close");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- void kickException() throws IOException {
|
|
|
- if (exceptionInClose) {
|
|
|
- throw new IOException("Emulated IOException in close");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Closes this output stream and releases any system
|
|
|
* resources associated with this stream.
|
|
@@ -890,20 +854,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
protected synchronized void closeImpl() throws IOException {
|
|
|
- boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean(
|
|
|
- HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY,
|
|
|
- HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT);
|
|
|
if (isClosed()) {
|
|
|
- if (recoverOnCloseException && !leaseRecovered) {
|
|
|
- try {
|
|
|
- dfsClient.endFileLease(fileId);
|
|
|
- dfsClient.recoverLease(src);
|
|
|
- leaseRecovered = true;
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Fail to recover lease for {}", src, e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
|
|
|
closed, getStreamer().streamerClosed());
|
|
|
try {
|
|
@@ -920,11 +871,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- EmulateExceptionInClose eei = new EmulateExceptionInClose(5);
|
|
|
try {
|
|
|
- flushBuffer(); // flush from all upper layers
|
|
|
- // for test
|
|
|
- eei.kickRandomException();
|
|
|
+ flushBuffer(); // flush from all upper layers
|
|
|
|
|
|
if (currentPacket != null) {
|
|
|
enqueueCurrentPacket();
|
|
@@ -935,28 +883,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- flushInternal(); // flush all data to Datanodes
|
|
|
+ flushInternal(); // flush all data to Datanodes
|
|
|
} catch (IOException ioe) {
|
|
|
cleanupAndRethrowIOException(ioe);
|
|
|
}
|
|
|
- // for test
|
|
|
- eei.kickRandomException();
|
|
|
completeFile();
|
|
|
- // for test
|
|
|
- eei.kickException();
|
|
|
} catch (ClosedChannelException ignored) {
|
|
|
- } catch (IOException ioe) {
|
|
|
- if (recoverOnCloseException) {
|
|
|
- try {
|
|
|
- dfsClient.endFileLease(fileId);
|
|
|
- dfsClient.recoverLease(src);
|
|
|
- leaseRecovered = true;
|
|
|
- } catch (Exception e) {
|
|
|
- // Ignore exception rendered by recoverLease. Throw original
|
|
|
- // exception
|
|
|
- }
|
|
|
- }
|
|
|
- throw ioe;
|
|
|
} finally {
|
|
|
// Failures may happen when flushing data.
|
|
|
// Streamers may keep waiting for the new block information.
|