|
@@ -496,6 +496,7 @@ class DFSClient implements FSConstants {
|
|
|
private long pos = 0;
|
|
|
private long filelen = 0;
|
|
|
private long blockEnd = -1;
|
|
|
+ private TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
|
|
|
|
|
|
/**
|
|
|
*/
|
|
@@ -565,7 +566,7 @@ class DFSClient implements FSConstants {
|
|
|
* Open a DataInputStream to a DataNode so that it can be read from.
|
|
|
* We get block ID and the IDs of the destinations at startup, from the namenode.
|
|
|
*/
|
|
|
- private synchronized DatanodeInfo blockSeekTo(long target, TreeSet deadNodes) throws IOException {
|
|
|
+ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
|
|
|
if (target >= filelen) {
|
|
|
throw new IOException("Attempted to read past end of file");
|
|
|
}
|
|
@@ -600,10 +601,9 @@ class DFSClient implements FSConstants {
|
|
|
//
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
|
//
|
|
|
- int failures = 0;
|
|
|
DatanodeInfo chosenNode = null;
|
|
|
while (s == null) {
|
|
|
- DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
|
|
|
+ DNAddrPair retval = chooseDataNode(targetBlock);
|
|
|
chosenNode = retval.info;
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
|
|
|
@@ -685,7 +685,7 @@ class DFSClient implements FSConstants {
|
|
|
int result = -1;
|
|
|
if (pos < filelen) {
|
|
|
if (pos > blockEnd) {
|
|
|
- currentNode = blockSeekTo(pos, new TreeSet());
|
|
|
+ currentNode = blockSeekTo(pos);
|
|
|
}
|
|
|
result = blockStream.read();
|
|
|
if (result >= 0) {
|
|
@@ -705,14 +705,10 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
if (pos < filelen) {
|
|
|
int retries = 2;
|
|
|
- TreeSet deadNodes = null;
|
|
|
while (retries > 0) {
|
|
|
try {
|
|
|
if (pos > blockEnd) {
|
|
|
- if (deadNodes == null) {
|
|
|
- deadNodes = new TreeSet();
|
|
|
- }
|
|
|
- currentNode = blockSeekTo(pos, deadNodes);
|
|
|
+ currentNode = blockSeekTo(pos);
|
|
|
}
|
|
|
int realLen = Math.min(len, (int) (blockEnd - pos + 1));
|
|
|
int result = blockStream.read(buf, off, realLen);
|
|
@@ -723,7 +719,6 @@ class DFSClient implements FSConstants {
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
|
|
|
blockEnd = -1;
|
|
|
- if (deadNodes == null) { deadNodes = new TreeSet(); }
|
|
|
if (currentNode != null) { deadNodes.add(currentNode); }
|
|
|
if (--retries == 0) {
|
|
|
throw e;
|
|
@@ -735,7 +730,7 @@ class DFSClient implements FSConstants {
|
|
|
}
|
|
|
|
|
|
|
|
|
- private DNAddrPair chooseDataNode(int blockId, TreeSet deadNodes)
|
|
|
+ private DNAddrPair chooseDataNode(int blockId)
|
|
|
throws IOException {
|
|
|
int failures = 0;
|
|
|
while (true) {
|
|
@@ -757,7 +752,7 @@ class DFSClient implements FSConstants {
|
|
|
Thread.sleep(3000);
|
|
|
} catch (InterruptedException iex) {
|
|
|
}
|
|
|
- deadNodes.clear();
|
|
|
+ deadNodes.clear(); //2nd option is to remove only nodes[blockId]
|
|
|
openInfo();
|
|
|
failures++;
|
|
|
continue;
|
|
@@ -770,10 +765,9 @@ class DFSClient implements FSConstants {
|
|
|
//
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
|
//
|
|
|
- TreeSet deadNodes = new TreeSet();
|
|
|
Socket dn = null;
|
|
|
while (dn == null) {
|
|
|
- DNAddrPair retval = chooseDataNode(blockId, deadNodes);
|
|
|
+ DNAddrPair retval = chooseDataNode(blockId);
|
|
|
DatanodeInfo chosenNode = retval.info;
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
|
|
|
@@ -900,11 +894,16 @@ class DFSClient implements FSConstants {
|
|
|
* If another node could not be found, then returns false.
|
|
|
*/
|
|
|
public synchronized boolean seekToNewSource(long targetPos) throws IOException {
|
|
|
- TreeSet excludeNodes = new TreeSet();
|
|
|
- excludeNodes.add(currentNode);
|
|
|
- String oldNodeID = currentNode.getStorageID();
|
|
|
- DatanodeInfo newNode = blockSeekTo(targetPos, excludeNodes);
|
|
|
- if (!oldNodeID.equals(newNode.getStorageID())) {
|
|
|
+ boolean markedDead = deadNodes.contains(currentNode);
|
|
|
+ deadNodes.add(currentNode);
|
|
|
+ DatanodeInfo oldNode = currentNode;
|
|
|
+ DatanodeInfo newNode = blockSeekTo(targetPos);
|
|
|
+ if ( !markedDead ) {
|
|
|
+ /* remove it from deadNodes. blockSeekTo could have cleared
|
|
|
+ * deadNodes and added currentNode again. Thats ok. */
|
|
|
+ deadNodes.remove(oldNode);
|
|
|
+ }
|
|
|
+ if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
|
|
|
currentNode = newNode;
|
|
|
return true;
|
|
|
} else {
|