|
@@ -38,6 +38,7 @@ import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
|
@@ -85,7 +86,6 @@ import org.apache.hadoop.util.Daemon;
|
|
|
import org.apache.hadoop.util.DataChecksum;
|
|
|
import org.apache.hadoop.util.Progressable;
|
|
|
import org.apache.hadoop.util.Time;
|
|
|
-import org.mortbay.log.Log;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.cache.CacheBuilder;
|
|
@@ -141,7 +141,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
private long bytesCurBlock = 0; // bytes writen in current block
|
|
|
private int packetSize = 0; // write packet size, not including the header.
|
|
|
private int chunksPerPacket = 0;
|
|
|
- private volatile IOException lastException = null;
|
|
|
+ private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
|
|
|
private long artificialSlowdown = 0;
|
|
|
private long lastFlushOffset = 0; // offset when flush was invoked
|
|
|
//persist blocks on namenode
|
|
@@ -814,8 +814,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
if (++pipelineRecoveryCount > 5) {
|
|
|
DFSClient.LOG.warn("Error recovering pipeline for writing " +
|
|
|
block + ". Already retried 5 times for the same packet.");
|
|
|
- lastException = new IOException("Failing write. Tried pipeline " +
|
|
|
- "recovery 5 times without success.");
|
|
|
+ lastException.set(new IOException("Failing write. Tried pipeline " +
|
|
|
+ "recovery 5 times without success."));
|
|
|
streamerClosed = true;
|
|
|
return false;
|
|
|
}
|
|
@@ -1005,8 +1005,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
}
|
|
|
if (nodes.length <= 1) {
|
|
|
- lastException = new IOException("All datanodes " + pipelineMsg
|
|
|
- + " are bad. Aborting...");
|
|
|
+ lastException.set(new IOException("All datanodes " + pipelineMsg
|
|
|
+ + " are bad. Aborting..."));
|
|
|
streamerClosed = true;
|
|
|
return false;
|
|
|
}
|
|
@@ -1021,7 +1021,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
newnodes.length-errorIndex);
|
|
|
nodes = newnodes;
|
|
|
hasError = false;
|
|
|
- lastException = null;
|
|
|
+ lastException.set(null);
|
|
|
errorIndex = -1;
|
|
|
}
|
|
|
|
|
@@ -1065,7 +1065,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
ExtendedBlock oldBlock = block;
|
|
|
do {
|
|
|
hasError = false;
|
|
|
- lastException = null;
|
|
|
+ lastException.set(null);
|
|
|
errorIndex = -1;
|
|
|
success = false;
|
|
|
|
|
@@ -1279,9 +1279,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
private void setLastException(IOException e) {
|
|
|
- if (lastException == null) {
|
|
|
- lastException = e;
|
|
|
- }
|
|
|
+ lastException.compareAndSet(null, e);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1313,7 +1311,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
protected void checkClosed() throws IOException {
|
|
|
if (closed) {
|
|
|
- IOException e = lastException;
|
|
|
+ IOException e = lastException.get();
|
|
|
throw e != null ? e : new ClosedChannelException();
|
|
|
}
|
|
|
}
|
|
@@ -1469,6 +1467,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
private void waitAndQueueCurrentPacket() throws IOException {
|
|
|
synchronized (dataQueue) {
|
|
|
+ try {
|
|
|
// If queue is full, then wait till we have enough space
|
|
|
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
|
|
|
try {
|
|
@@ -1487,6 +1486,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
checkClosed();
|
|
|
queueCurrentPacket();
|
|
|
+ } catch (ClosedChannelException e) {
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1735,7 +1736,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
DFSClient.LOG.warn("Error while syncing", e);
|
|
|
synchronized (this) {
|
|
|
if (!closed) {
|
|
|
- lastException = new IOException("IOException flush:" + e);
|
|
|
+ lastException.set(new IOException("IOException flush:" + e));
|
|
|
closeThreads(true);
|
|
|
}
|
|
|
}
|
|
@@ -1793,21 +1794,25 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
if (DFSClient.LOG.isDebugEnabled()) {
|
|
|
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
|
|
|
}
|
|
|
- synchronized (dataQueue) {
|
|
|
- while (!closed) {
|
|
|
- checkClosed();
|
|
|
- if (lastAckedSeqno >= seqno) {
|
|
|
- break;
|
|
|
- }
|
|
|
- try {
|
|
|
- dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- throw new InterruptedIOException(
|
|
|
- "Interrupted while waiting for data to be acknowledged by pipeline");
|
|
|
+ try {
|
|
|
+ synchronized (dataQueue) {
|
|
|
+ while (!closed) {
|
|
|
+ checkClosed();
|
|
|
+ if (lastAckedSeqno >= seqno) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ dataQueue.wait(1000); // when we receive an ack, we notify on
|
|
|
+ // dataQueue
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ throw new InterruptedIOException(
|
|
|
+ "Interrupted while waiting for data to be acknowledged by pipeline");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ checkClosed();
|
|
|
+ } catch (ClosedChannelException e) {
|
|
|
}
|
|
|
- checkClosed();
|
|
|
}
|
|
|
|
|
|
private synchronized void start() {
|
|
@@ -1853,7 +1858,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
@Override
|
|
|
public synchronized void close() throws IOException {
|
|
|
if (closed) {
|
|
|
- IOException e = lastException;
|
|
|
+ IOException e = lastException.getAndSet(null);
|
|
|
if (e == null)
|
|
|
return;
|
|
|
else
|
|
@@ -1880,6 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
closeThreads(false);
|
|
|
completeFile(lastBlock);
|
|
|
dfsClient.endFileLease(src);
|
|
|
+ } catch (ClosedChannelException e) {
|
|
|
} finally {
|
|
|
closed = true;
|
|
|
}
|