|
@@ -39,6 +39,7 @@ import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
|
@@ -241,8 +242,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
/**
|
|
|
* Create a new packet.
|
|
|
*
|
|
|
- * @param pktSize maximum size of the packet,
|
|
|
- * including checksum data and actual data.
|
|
|
* @param chunksPerPkt maximum number of chunks per packet.
|
|
|
* @param offsetInBlock offset in bytes into the HDFS block.
|
|
|
*/
|
|
@@ -405,7 +404,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
private String[] favoredNodes;
|
|
|
volatile boolean hasError = false;
|
|
|
volatile int errorIndex = -1;
|
|
|
- volatile int restartingNodeIndex = -1; // Restarting node index
|
|
|
+ // Restarting node index
|
|
|
+ AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
|
|
|
private long restartDeadline = 0; // Deadline of DN restart
|
|
|
private BlockConstructionStage stage; // block construction stage
|
|
|
private long bytesSent = 0; // number of bytes that've been sent
|
|
@@ -556,7 +556,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
try {
|
|
|
// process datanode IO errors if any
|
|
|
boolean doSleep = false;
|
|
|
- if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
|
|
|
+ if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
|
|
|
doSleep = processDatanodeError();
|
|
|
}
|
|
|
|
|
@@ -699,7 +699,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
} catch (Throwable e) {
|
|
|
// Log warning if there was a real error.
|
|
|
- if (restartingNodeIndex == -1) {
|
|
|
+ if (restartingNodeIndex.get() == -1) {
|
|
|
DFSClient.LOG.warn("DataStreamer Exception", e);
|
|
|
}
|
|
|
if (e instanceof IOException) {
|
|
@@ -708,7 +708,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
setLastException(new IOException("DataStreamer Exception: ",e));
|
|
|
}
|
|
|
hasError = true;
|
|
|
- if (errorIndex == -1 && restartingNodeIndex == -1) {
|
|
|
+ if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
|
|
|
// Not a datanode issue
|
|
|
streamerClosed = true;
|
|
|
}
|
|
@@ -806,7 +806,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
/** Set the restarting node index. Called by responder */
|
|
|
synchronized void setRestartingNodeIndex(int idx) {
|
|
|
- restartingNodeIndex = idx;
|
|
|
+ restartingNodeIndex.set(idx);
|
|
|
// If the data streamer has already set the primary node
|
|
|
// bad, clear it. It is likely that the write failed due to
|
|
|
// the DN shutdown. Even if it was a real failure, the pipeline
|
|
@@ -821,7 +821,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
*/
|
|
|
synchronized void tryMarkPrimaryDatanodeFailed() {
|
|
|
// There should be no existing error and no ongoing restart.
|
|
|
- if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
|
|
|
+ if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
|
|
|
errorIndex = 0;
|
|
|
}
|
|
|
}
|
|
@@ -962,7 +962,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
synchronized (dataQueue) {
|
|
|
dataQueue.notifyAll();
|
|
|
}
|
|
|
- if (restartingNodeIndex == -1) {
|
|
|
+ if (restartingNodeIndex.get() == -1) {
|
|
|
DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
|
|
|
+ " for block " + block, e);
|
|
|
}
|
|
@@ -1186,7 +1186,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
// Sleep before reconnect if a dn is restarting.
|
|
|
// This process will be repeated until the deadline or the datanode
|
|
|
// starts back up.
|
|
|
- if (restartingNodeIndex >= 0) {
|
|
|
+ if (restartingNodeIndex.get() >= 0) {
|
|
|
// 4 seconds or the configured deadline period, whichever is shorter.
|
|
|
// This is the retry interval and recovery will be retried in this
|
|
|
// interval until timeout or success.
|
|
@@ -1196,7 +1196,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
Thread.sleep(delay);
|
|
|
} catch (InterruptedException ie) {
|
|
|
lastException.set(new IOException("Interrupted while waiting for " +
|
|
|
- "datanode to restart. " + nodes[restartingNodeIndex]));
|
|
|
+ "datanode to restart. " + nodes[restartingNodeIndex.get()]));
|
|
|
streamerClosed = true;
|
|
|
return false;
|
|
|
}
|
|
@@ -1237,21 +1237,21 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
setPipeline(newnodes, newStorageTypes, newStorageIDs);
|
|
|
|
|
|
// Just took care of a node error while waiting for a node restart
|
|
|
- if (restartingNodeIndex >= 0) {
|
|
|
+ if (restartingNodeIndex.get() >= 0) {
|
|
|
// If the error came from a node further away than the restarting
|
|
|
// node, the restart must have been complete.
|
|
|
- if (errorIndex > restartingNodeIndex) {
|
|
|
- restartingNodeIndex = -1;
|
|
|
- } else if (errorIndex < restartingNodeIndex) {
|
|
|
+ if (errorIndex > restartingNodeIndex.get()) {
|
|
|
+ restartingNodeIndex.set(-1);
|
|
|
+ } else if (errorIndex < restartingNodeIndex.get()) {
|
|
|
// the node index has shifted.
|
|
|
- restartingNodeIndex--;
|
|
|
+ restartingNodeIndex.decrementAndGet();
|
|
|
} else {
|
|
|
// this shouldn't happen...
|
|
|
assert false;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (restartingNodeIndex == -1) {
|
|
|
+ if (restartingNodeIndex.get() == -1) {
|
|
|
hasError = false;
|
|
|
}
|
|
|
lastException.set(null);
|
|
@@ -1293,10 +1293,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
|
|
|
}
|
|
|
|
|
|
- if (restartingNodeIndex >= 0) {
|
|
|
+ if (restartingNodeIndex.get() >= 0) {
|
|
|
assert hasError == true;
|
|
|
// check errorIndex set above
|
|
|
- if (errorIndex == restartingNodeIndex) {
|
|
|
+ if (errorIndex == restartingNodeIndex.get()) {
|
|
|
// ignore, if came from the restarting node
|
|
|
errorIndex = -1;
|
|
|
}
|
|
@@ -1306,8 +1306,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
// expired. declare the restarting node dead
|
|
|
restartDeadline = 0;
|
|
|
- int expiredNodeIndex = restartingNodeIndex;
|
|
|
- restartingNodeIndex = -1;
|
|
|
+ int expiredNodeIndex = restartingNodeIndex.get();
|
|
|
+ restartingNodeIndex.set(-1);
|
|
|
DFSClient.LOG.warn("Datanode did not restart in time: " +
|
|
|
nodes[expiredNodeIndex]);
|
|
|
// Mark the restarting node as failed. If there is any other failed
|
|
@@ -1459,7 +1459,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
// from the local datanode. Thus it is safe to treat this as a
|
|
|
// regular node error.
|
|
|
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
|
|
|
- restartingNodeIndex == -1) {
|
|
|
+ restartingNodeIndex.get() == -1) {
|
|
|
checkRestart = true;
|
|
|
throw new IOException("A datanode is restarting.");
|
|
|
}
|
|
@@ -1476,10 +1476,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
assert null == blockStream : "Previous blockStream unclosed";
|
|
|
blockStream = out;
|
|
|
result = true; // success
|
|
|
- restartingNodeIndex = -1;
|
|
|
+ restartingNodeIndex.set(-1);
|
|
|
hasError = false;
|
|
|
} catch (IOException ie) {
|
|
|
- if (restartingNodeIndex == -1) {
|
|
|
+ if (restartingNodeIndex.get() == -1) {
|
|
|
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
|
|
|
}
|
|
|
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
|
|
@@ -1511,10 +1511,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
if (checkRestart && shouldWaitForRestart(errorIndex)) {
|
|
|
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
|
|
|
Time.now();
|
|
|
- restartingNodeIndex = errorIndex;
|
|
|
+ restartingNodeIndex.set(errorIndex);
|
|
|
errorIndex = -1;
|
|
|
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
|
|
|
- nodes[restartingNodeIndex]);
|
|
|
+ nodes[restartingNodeIndex.get()]);
|
|
|
}
|
|
|
hasError = true;
|
|
|
setLastException(ie);
|