|
@@ -1208,22 +1208,46 @@ class DataStreamer extends Daemon {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
- //get a new datanode
|
|
|
|
|
|
+ int tried = 0;
|
|
final DatanodeInfo[] original = nodes;
|
|
final DatanodeInfo[] original = nodes;
|
|
- final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
|
|
|
|
- src, stat.getFileId(), block, nodes, storageIDs,
|
|
|
|
- failed.toArray(new DatanodeInfo[failed.size()]),
|
|
|
|
- 1, dfsClient.clientName);
|
|
|
|
- setPipeline(lb);
|
|
|
|
-
|
|
|
|
- //find the new datanode
|
|
|
|
- final int d = findNewDatanode(original);
|
|
|
|
-
|
|
|
|
- //transfer replica
|
|
|
|
- final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
|
|
|
|
- final DatanodeInfo[] targets = {nodes[d]};
|
|
|
|
- final StorageType[] targetStorageTypes = {storageTypes[d]};
|
|
|
|
- transfer(src, targets, targetStorageTypes, lb.getBlockToken());
|
|
|
|
|
|
+ final StorageType[] originalTypes = storageTypes;
|
|
|
|
+ final String[] originalIDs = storageIDs;
|
|
|
|
+ IOException caughtException = null;
|
|
|
|
+ ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(failed);
|
|
|
|
+ while (tried < 3) {
|
|
|
|
+ LocatedBlock lb;
|
|
|
|
+ //get a new datanode
|
|
|
|
+ lb = dfsClient.namenode.getAdditionalDatanode(
|
|
|
|
+ src, stat.getFileId(), block, nodes, storageIDs,
|
|
|
|
+ exclude.toArray(new DatanodeInfo[exclude.size()]),
|
|
|
|
+ 1, dfsClient.clientName);
|
|
|
|
+ // a new node was allocated by the namenode. Update nodes.
|
|
|
|
+ setPipeline(lb);
|
|
|
|
+
|
|
|
|
+ //find the new datanode
|
|
|
|
+ final int d = findNewDatanode(original);
|
|
|
|
+ //transfer replica. pick a source from the original nodes
|
|
|
|
+ final DatanodeInfo src = original[tried % original.length];
|
|
|
|
+ final DatanodeInfo[] targets = {nodes[d]};
|
|
|
|
+ final StorageType[] targetStorageTypes = {storageTypes[d]};
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ transfer(src, targets, targetStorageTypes, lb.getBlockToken());
|
|
|
|
+ } catch (IOException ioe) {
|
|
|
|
+ DFSClient.LOG.warn("Error transferring data from " + src + " to " +
|
|
|
|
+ nodes[d] + ": " + ioe.getMessage());
|
|
|
|
+ caughtException = ioe;
|
|
|
|
+ // add the allocated node to the exclude list.
|
|
|
|
+ exclude.add(nodes[d]);
|
|
|
|
+ setPipeline(original, originalTypes, originalIDs);
|
|
|
|
+ tried++;
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ return; // finished successfully
|
|
|
|
+ }
|
|
|
|
+ // All retries failed
|
|
|
|
+ throw (caughtException != null) ? caughtException :
|
|
|
|
+ new IOException("Failed to add a node");
|
|
}
|
|
}
|
|
|
|
|
|
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
|
|
private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
|
|
@@ -1236,7 +1260,11 @@ class DataStreamer extends Daemon {
|
|
try {
|
|
try {
|
|
sock = createSocketForPipeline(src, 2, dfsClient);
|
|
sock = createSocketForPipeline(src, 2, dfsClient);
|
|
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
|
|
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
|
|
- final long readTimeout = dfsClient.getDatanodeReadTimeout(2);
|
|
|
|
|
|
+
|
|
|
|
+ // transfer timeout multiplier based on the transfer size
|
|
|
|
+ // One per 200 packets = 12.8MB. Minimum is 2.
|
|
|
|
+ int multi = 2 + (int)(bytesSent/dfsClient.getConf().getWritePacketSize())/200;
|
|
|
|
+ final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
|
|
|
|
|
|
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
|
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
|
|
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
|
|
InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
|