|
@@ -36,8 +36,6 @@ import java.nio.BufferOverflowException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.AbstractMap;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
@@ -2425,6 +2423,13 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
long getLastByteOffsetBlock() {
|
|
|
return offsetInBlock + dataPos - dataStart;
|
|
|
}
|
|
|
+
|
|
|
+ public String toString() {
|
|
|
+ return "packet seqno:" + this.seqno +
|
|
|
+ " offsetInBlock:" + this.offsetInBlock +
|
|
|
+ " lastPacketInBlock:" + this.lastPacketInBlock +
|
|
|
+ " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//
|
|
@@ -2436,8 +2441,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// if them are received, the DataStreamer closes the current block.
|
|
|
//
|
|
|
class DataStreamer extends Daemon {
|
|
|
- private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
|
|
|
- private int recoveryErrorCount = 0; // number of times block recovery failed
|
|
|
private volatile boolean streamerClosed = false;
|
|
|
private Block block; // its length is number of bytes acked
|
|
|
private AccessToken accessToken;
|
|
@@ -2446,7 +2449,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private ResponseProcessor response = null;
|
|
|
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
|
|
|
volatile boolean hasError = false;
|
|
|
- volatile int errorIndex = 0;
|
|
|
+ volatile int errorIndex = -1;
|
|
|
private BlockConstructionStage stage; // block construction stage
|
|
|
private long bytesSent = 0; // number of bytes that've been sent
|
|
|
|
|
@@ -2511,7 +2514,17 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
"of file " + src);
|
|
|
|
|
|
}
|
|
|
- processDatanodeError(true, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize for data streaming
|
|
|
+ */
|
|
|
+ private void initDataStreaming() {
|
|
|
+ this.setName("DataStreamer for file " + src +
|
|
|
+ " block " + block);
|
|
|
+ response = new ResponseProcessor(nodes);
|
|
|
+ response.start();
|
|
|
+ stage = BlockConstructionStage.DATA_STREAMING;
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -2533,38 +2546,41 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
|
|
|
Packet one = null;
|
|
|
|
|
|
- // process IO errors if any
|
|
|
- boolean doSleep = processDatanodeError(hasError, false);
|
|
|
+ try {
|
|
|
+ // process datanode IO errors if any
|
|
|
+ boolean doSleep = false;
|
|
|
+ if (hasError && errorIndex>=0) {
|
|
|
+ doSleep = processDatanodeError();
|
|
|
+ }
|
|
|
|
|
|
- synchronized (dataQueue) {
|
|
|
- // wait for a packet to be sent.
|
|
|
- while ((!streamerClosed && !hasError && clientRunning
|
|
|
- && dataQueue.size() == 0) || doSleep) {
|
|
|
- try {
|
|
|
- dataQueue.wait(1000);
|
|
|
- } catch (InterruptedException e) {
|
|
|
+ synchronized (dataQueue) {
|
|
|
+ // wait for a packet to be sent.
|
|
|
+ while ((!streamerClosed && !hasError && clientRunning
|
|
|
+ && dataQueue.size() == 0) || doSleep) {
|
|
|
+ try {
|
|
|
+ dataQueue.wait(1000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ }
|
|
|
+ doSleep = false;
|
|
|
}
|
|
|
- doSleep = false;
|
|
|
- }
|
|
|
- if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
|
|
|
- continue;
|
|
|
+ if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ // get packet to be sent.
|
|
|
+ one = dataQueue.getFirst();
|
|
|
}
|
|
|
- // get packet to be sent.
|
|
|
- one = dataQueue.getFirst();
|
|
|
- }
|
|
|
|
|
|
- try {
|
|
|
long offsetInBlock = one.offsetInBlock;
|
|
|
|
|
|
// get new block from namenode.
|
|
|
- if (blockStream == null) {
|
|
|
+ if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
|
|
|
LOG.debug("Allocating new block");
|
|
|
- nodes = nextBlockOutputStream(src);
|
|
|
- this.setName("DataStreamer for file " + src +
|
|
|
- " block " + block);
|
|
|
- response = new ResponseProcessor(nodes);
|
|
|
- response.start();
|
|
|
- stage = BlockConstructionStage.DATA_STREAMING;
|
|
|
+ nodes = nextBlockOutputStream(src);
|
|
|
+ initDataStreaming();
|
|
|
+ } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
|
|
+ LOG.debug("Append to block " + block);
|
|
|
+ setupPipelineForAppendOrRecovery();
|
|
|
+ initDataStreaming();
|
|
|
}
|
|
|
|
|
|
if (offsetInBlock >= blockSize) {
|
|
@@ -2584,6 +2600,15 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
dataQueue.notifyAll();
|
|
|
}
|
|
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("DataStreamer block " + block +
|
|
|
+ " sending packet seqno:" + one.seqno +
|
|
|
+ " size:" + buf.remaining() +
|
|
|
+ " offsetInBlock:" + one.offsetInBlock +
|
|
|
+ " lastPacketInBlock:" + one.lastPacketInBlock +
|
|
|
+ " lastByteOffsetInBlock" + one.getLastByteOffsetBlock());
|
|
|
+ }
|
|
|
+
|
|
|
// write out data to remote datanode
|
|
|
blockStream.write(buf.array(), buf.position(), buf.remaining());
|
|
|
blockStream.flush();
|
|
@@ -2610,13 +2635,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
blockStream.flush();
|
|
|
}
|
|
|
}
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("DataStreamer block " + block +
|
|
|
- " wrote packet seqno:" + one.seqno +
|
|
|
- " size:" + buf.remaining() +
|
|
|
- " offsetInBlock:" + one.offsetInBlock +
|
|
|
- " lastPacketInBlock:" + one.lastPacketInBlock);
|
|
|
- }
|
|
|
} catch (Throwable e) {
|
|
|
LOG.warn("DataStreamer Exception: " +
|
|
|
StringUtils.stringifyException(e));
|
|
@@ -2624,6 +2642,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
setLastException((IOException)e);
|
|
|
}
|
|
|
hasError = true;
|
|
|
+ if (errorIndex == -1) { // not a datanode error
|
|
|
+ streamerClosed = true;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
@@ -2751,10 +2772,20 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
|
|
|
// processes response status from all datanodes.
|
|
|
+ String replies = null;
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ replies = "DFSClient Replies for seqno " + seqno + " are";
|
|
|
+ }
|
|
|
for (int i = 0; i < targets.length && clientRunning; i++) {
|
|
|
final DataTransferProtocol.Status reply
|
|
|
= DataTransferProtocol.Status.read(blockReplyStream);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ replies += " " + reply;
|
|
|
+ }
|
|
|
if (reply != SUCCESS) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(replies);
|
|
|
+ }
|
|
|
errorIndex = i; // first bad datanode
|
|
|
throw new IOException("Bad response " + reply +
|
|
|
" for block " + block +
|
|
@@ -2763,11 +2794,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (one != null) {
|
|
|
- // update bytesAcked
|
|
|
- block.setNumBytes(one.getLastByteOffsetBlock());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(replies);
|
|
|
}
|
|
|
|
|
|
+ if (one == null) {
|
|
|
+ throw new IOException("Panic: responder did not receive " +
|
|
|
+ "an ack for a packet: " + seqno);
|
|
|
+ }
|
|
|
+
|
|
|
+ // update bytesAcked
|
|
|
+ block.setNumBytes(one.getLastByteOffsetBlock());
|
|
|
+
|
|
|
synchronized (dataQueue) {
|
|
|
ackQueue.removeFirst();
|
|
|
dataQueue.notifyAll();
|
|
@@ -2778,6 +2816,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
setLastException((IOException)e);
|
|
|
}
|
|
|
hasError = true;
|
|
|
+ errorIndex = errorIndex==-1 ? 0 : errorIndex;
|
|
|
synchronized (dataQueue) {
|
|
|
dataQueue.notifyAll();
|
|
|
}
|
|
@@ -2800,21 +2839,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// threads and mark stream as closed. Returns true if we should
|
|
|
// sleep for a while after returning from this call.
|
|
|
//
|
|
|
- private boolean processDatanodeError(boolean error, boolean isAppend) {
|
|
|
- if (!error) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ private boolean processDatanodeError() throws IOException {
|
|
|
if (response != null) {
|
|
|
LOG.info("Error Recovery for block " + block +
|
|
|
" waiting for responder to exit. ");
|
|
|
return true;
|
|
|
}
|
|
|
- if (errorIndex >= 0) {
|
|
|
- LOG.warn("Error Recovery for block " + block
|
|
|
- + " bad datanode[" + errorIndex + "] "
|
|
|
- + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
|
|
|
- }
|
|
|
-
|
|
|
closeStream();
|
|
|
|
|
|
// move packets from ack queue to front of the data queue
|
|
@@ -2823,31 +2853,49 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
ackQueue.clear();
|
|
|
}
|
|
|
|
|
|
+ boolean doSleep = setupPipelineForAppendOrRecovery();
|
|
|
+
|
|
|
+ if (!streamerClosed && clientRunning) {
|
|
|
+ initDataStreaming();
|
|
|
+ }
|
|
|
+
|
|
|
+ return doSleep;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Open a DataOutputStream to a DataNode pipeline so that
|
|
|
+ * it can be written to.
|
|
|
+ * This happens when a file is appended or data streaming fails
|
|
|
+ * It keeps on trying until a pipeline is setup
|
|
|
+ */
|
|
|
+ private boolean setupPipelineForAppendOrRecovery() throws IOException {
|
|
|
+ // check number of datanodes
|
|
|
+ if (nodes == null || nodes.length == 0) {
|
|
|
+ String msg = "Could not get block locations. " + "Source file \""
|
|
|
+ + src + "\" - Aborting...";
|
|
|
+ LOG.warn(msg);
|
|
|
+ setLastException(new IOException(msg));
|
|
|
+ streamerClosed = true;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
boolean success = false;
|
|
|
+ long newGS = 0L;
|
|
|
while (!success && !streamerClosed && clientRunning) {
|
|
|
- DatanodeInfo[] newnodes = null;
|
|
|
- if (nodes == null) {
|
|
|
- String msg = "Could not get block locations. " + "Source file \""
|
|
|
- + src + "\" - Aborting...";
|
|
|
- LOG.warn(msg);
|
|
|
- setLastException(new IOException(msg));
|
|
|
- streamerClosed = true;
|
|
|
- return false;
|
|
|
- }
|
|
|
- StringBuilder pipelineMsg = new StringBuilder();
|
|
|
- for (int j = 0; j < nodes.length; j++) {
|
|
|
- pipelineMsg.append(nodes[j].getName());
|
|
|
- if (j < nodes.length - 1) {
|
|
|
- pipelineMsg.append(", ");
|
|
|
- }
|
|
|
- }
|
|
|
+ boolean isRecovery = hasError;
|
|
|
// remove bad datanode from list of datanodes.
|
|
|
// If errorIndex was not set (i.e. appends), then do not remove
|
|
|
// any datanodes
|
|
|
//
|
|
|
- if (errorIndex < 0) {
|
|
|
- newnodes = nodes;
|
|
|
- } else {
|
|
|
+ if (errorIndex >= 0) {
|
|
|
+ StringBuilder pipelineMsg = new StringBuilder();
|
|
|
+ for (int j = 0; j < nodes.length; j++) {
|
|
|
+ pipelineMsg.append(nodes[j].getName());
|
|
|
+ if (j < nodes.length - 1) {
|
|
|
+ pipelineMsg.append(", ");
|
|
|
+ }
|
|
|
+ }
|
|
|
if (nodes.length <= 1) {
|
|
|
lastException = new IOException("All datanodes " + pipelineMsg
|
|
|
+ " are bad. Aborting...");
|
|
@@ -2857,86 +2905,32 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
LOG.warn("Error Recovery for block " + block +
|
|
|
" in pipeline " + pipelineMsg +
|
|
|
": bad datanode " + nodes[errorIndex].getName());
|
|
|
- newnodes = new DatanodeInfo[nodes.length-1];
|
|
|
+ DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
|
|
|
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
|
|
|
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
|
|
|
newnodes.length-errorIndex);
|
|
|
+ nodes = newnodes;
|
|
|
+ this.hasError = false;
|
|
|
+ lastException = null;
|
|
|
+ errorIndex = -1;
|
|
|
}
|
|
|
|
|
|
- // Tell the primary datanode to do error recovery
|
|
|
- // by stamping appropriate generation stamps.
|
|
|
- //
|
|
|
- LocatedBlock newBlock = null;
|
|
|
- ClientDatanodeProtocol primary = null;
|
|
|
- DatanodeInfo primaryNode = null;
|
|
|
- try {
|
|
|
- // Pick the "least" datanode as the primary datanode to avoid deadlock.
|
|
|
- primaryNode = Collections.min(Arrays.asList(newnodes));
|
|
|
- primary = createClientDatanodeProtocolProxy(primaryNode, conf);
|
|
|
- newBlock = primary.recoverBlock(block, isAppend, newnodes);
|
|
|
- } catch (IOException e) {
|
|
|
- recoveryErrorCount++;
|
|
|
- if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
|
|
|
- if (nodes.length > 1) {
|
|
|
- // if the primary datanode failed, remove it from the list.
|
|
|
- // The original bad datanode is left in the list because it is
|
|
|
- // conservative to remove only one datanode in one iteration.
|
|
|
- for (int j = 0; j < nodes.length; j++) {
|
|
|
- if (nodes[j].equals(primaryNode)) {
|
|
|
- errorIndex = j; // forget original bad node.
|
|
|
- }
|
|
|
- }
|
|
|
- // remove primary node from list
|
|
|
- newnodes = new DatanodeInfo[nodes.length-1];
|
|
|
- System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
|
|
|
- System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
|
|
|
- newnodes.length-errorIndex);
|
|
|
- nodes = newnodes;
|
|
|
- LOG.warn("Error Recovery for block " + block + " failed "
|
|
|
- + " because recovery from primary datanode " + primaryNode
|
|
|
- + " failed " + recoveryErrorCount + " times. "
|
|
|
- + " Pipeline was " + pipelineMsg
|
|
|
- + ". Marking primary datanode as bad.");
|
|
|
- recoveryErrorCount = 0;
|
|
|
- errorIndex = -1;
|
|
|
- return true; // sleep when we return from here
|
|
|
- }
|
|
|
- String emsg = "Error Recovery for block " + block + " failed "
|
|
|
- + " because recovery from primary datanode " + primaryNode
|
|
|
- + " failed " + recoveryErrorCount + " times. "
|
|
|
- + " Pipeline was " + pipelineMsg + ". Aborting...";
|
|
|
- LOG.warn(emsg);
|
|
|
- lastException = new IOException(emsg);
|
|
|
- streamerClosed = true;
|
|
|
- return false; // abort with IOexception
|
|
|
- }
|
|
|
- LOG.warn("Error Recovery for block " + block + " failed "
|
|
|
- + " because recovery from primary datanode " + primaryNode
|
|
|
- + " failed " + recoveryErrorCount + " times. "
|
|
|
- + " Pipeline was " + pipelineMsg + ". Will retry...");
|
|
|
- return true; // sleep when we return from here
|
|
|
- } finally {
|
|
|
- RPC.stopProxy(primary);
|
|
|
- }
|
|
|
- recoveryErrorCount = 0; // block recovery successful
|
|
|
-
|
|
|
- // If the block recovery generated a new generation stamp, use that
|
|
|
- // from now on. Also, setup new pipeline
|
|
|
- // newBlock should never be null and it should contain a newly
|
|
|
- // generated access token.
|
|
|
- block = newBlock.getBlock();
|
|
|
- accessToken = newBlock.getAccessToken();
|
|
|
- nodes = newBlock.getLocations();
|
|
|
-
|
|
|
- this.hasError = false;
|
|
|
- lastException = null;
|
|
|
- errorIndex = 0;
|
|
|
- success = createBlockOutputStream(nodes, clientName, true);
|
|
|
+ // get a new generation stamp and an access token
|
|
|
+ LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
|
|
|
+ newGS = lb.getBlock().getGenerationStamp();
|
|
|
+ accessToken = lb.getAccessToken();
|
|
|
+
|
|
|
+ // set up the pipeline again with the remaining nodes
|
|
|
+ success = createBlockOutputStream(nodes, newGS, isRecovery);
|
|
|
}
|
|
|
|
|
|
- if (!streamerClosed && clientRunning) {
|
|
|
- response = new ResponseProcessor(nodes);
|
|
|
- response.start();
|
|
|
+ if (success) {
|
|
|
+ // update pipeline at the namenode
|
|
|
+ Block newBlock = new Block(
|
|
|
+ block.getBlockId(), block.getNumBytes(), newGS);
|
|
|
+ namenode.updatePipeline(clientName, block, newBlock, nodes);
|
|
|
+ // update client side generation stamp
|
|
|
+ block = newBlock;
|
|
|
}
|
|
|
return false; // do not sleep, continue processing
|
|
|
}
|
|
@@ -2956,7 +2950,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
do {
|
|
|
hasError = false;
|
|
|
lastException = null;
|
|
|
- errorIndex = 0;
|
|
|
+ errorIndex = -1;
|
|
|
retry = false;
|
|
|
success = false;
|
|
|
|
|
@@ -2970,7 +2964,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
//
|
|
|
// Connect to first DataNode in the list.
|
|
|
//
|
|
|
- success = createBlockOutputStream(nodes, clientName, false);
|
|
|
+ success = createBlockOutputStream(nodes, 0L, false);
|
|
|
|
|
|
if (!success) {
|
|
|
LOG.info("Abandoning block " + block);
|
|
@@ -2998,7 +2992,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
// connects to the first datanode in the pipeline
|
|
|
// Returns true if success, otherwise return failure.
|
|
|
//
|
|
|
- private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
|
|
|
+ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
|
|
|
boolean recoveryFlag) {
|
|
|
DataTransferProtocol.Status pipelineStatus = SUCCESS;
|
|
|
String firstBadLink = "";
|
|
@@ -3033,11 +3027,11 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
DataNode.SMALL_BUFFER_SIZE));
|
|
|
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
|
|
|
|
|
|
- // send the request: newGS now uses a dummy value 0 for now
|
|
|
+ // send the request
|
|
|
DataTransferProtocol.Sender.opWriteBlock(out,
|
|
|
block.getBlockId(), block.getGenerationStamp(),
|
|
|
- nodes.length, recoveryFlag?stage.getRecoveryStage():stage, 0,
|
|
|
- block.getNumBytes(), bytesSent, client, null, nodes, accessToken);
|
|
|
+ nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
|
|
|
+ block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
|
|
|
checksum.writeHeader(out);
|
|
|
out.flush();
|
|
|
|
|
@@ -3070,6 +3064,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
+ } else {
|
|
|
+ errorIndex = 0;
|
|
|
}
|
|
|
hasError = true;
|
|
|
setLastException(ie);
|