|
@@ -402,8 +402,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
}
|
|
|
|
|
|
// setup pipeline to append to the last block XXX retries??
|
|
|
- nodes = lastBlock.getLocations();
|
|
|
- storageIDs = lastBlock.getStorageIDs();
|
|
|
+ setPipeline(lastBlock);
|
|
|
errorIndex = -1; // no errors yet.
|
|
|
if (nodes.length < 1) {
|
|
|
throw new IOException("Unable to retrieve blocks locations " +
|
|
@@ -412,6 +411,14 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void setPipeline(LocatedBlock lb) {
|
|
|
+ setPipeline(lb.getLocations(), lb.getStorageIDs());
|
|
|
+ }
|
|
|
+ private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
|
|
|
+ this.nodes = nodes;
|
|
|
+ this.storageIDs = storageIDs;
|
|
|
+ }
|
|
|
|
|
|
private void setFavoredNodes(String[] favoredNodes) {
|
|
|
this.favoredNodes = favoredNodes;
|
|
@@ -435,7 +442,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
this.setName("DataStreamer for file " + src);
|
|
|
closeResponder();
|
|
|
closeStream();
|
|
|
- nodes = null;
|
|
|
+ setPipeline(null, null);
|
|
|
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
|
|
}
|
|
|
|
|
@@ -504,7 +511,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
if(DFSClient.LOG.isDebugEnabled()) {
|
|
|
DFSClient.LOG.debug("Allocating new block");
|
|
|
}
|
|
|
- nodes = nextBlockOutputStream();
|
|
|
+ setPipeline(nextBlockOutputStream());
|
|
|
initDataStreaming();
|
|
|
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
|
|
|
if(DFSClient.LOG.isDebugEnabled()) {
|
|
@@ -912,7 +919,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
src, block, nodes, storageIDs,
|
|
|
failed.toArray(new DatanodeInfo[failed.size()]),
|
|
|
1, dfsClient.clientName);
|
|
|
- nodes = lb.getLocations();
|
|
|
+ setPipeline(lb);
|
|
|
|
|
|
//find the new datanode
|
|
|
final int d = findNewDatanode(original);
|
|
@@ -1012,7 +1019,14 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
|
|
|
System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
|
|
|
newnodes.length-errorIndex);
|
|
|
- nodes = newnodes;
|
|
|
+
|
|
|
+ final String[] newStorageIDs = new String[newnodes.length];
|
|
|
+ System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
|
|
|
+ System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
|
|
|
+ newStorageIDs.length-errorIndex);
|
|
|
+
|
|
|
+ setPipeline(newnodes, newStorageIDs);
|
|
|
+
|
|
|
hasError = false;
|
|
|
lastException.set(null);
|
|
|
errorIndex = -1;
|
|
@@ -1051,7 +1065,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
* Must get block ID and the IDs of the destinations from the namenode.
|
|
|
* Returns the list of target datanodes.
|
|
|
*/
|
|
|
- private DatanodeInfo[] nextBlockOutputStream() throws IOException {
|
|
|
+ private LocatedBlock nextBlockOutputStream() throws IOException {
|
|
|
LocatedBlock lb = null;
|
|
|
DatanodeInfo[] nodes = null;
|
|
|
int count = dfsClient.getConf().nBlockWriteRetry;
|
|
@@ -1093,7 +1107,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|
|
if (!success) {
|
|
|
throw new IOException("Unable to create new block.");
|
|
|
}
|
|
|
- return nodes;
|
|
|
+ return lb;
|
|
|
}
|
|
|
|
|
|
// connects to the first datanode in the pipeline
|