|
@@ -1945,74 +1945,126 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
* Grab the open-file info from namenode
|
|
|
*/
|
|
|
synchronized void openInfo() throws IOException {
|
|
|
- LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
|
|
|
+ for (int retries = 3; retries > 0; retries--) {
|
|
|
+ if (fetchLocatedBlocks()) {
|
|
|
+ // fetch block success
|
|
|
+ return;
|
|
|
+ } else {
|
|
|
+ // Last block location unavailable. When a cluster restarts,
|
|
|
+ // DNs may not report immediately. At this time partial block
|
|
|
+ // locations will not be available with NN for getting the length.
|
|
|
+ // Lets retry a few times to get the length.
|
|
|
+ DFSClient.LOG.warn("Last block locations unavailable. "
|
|
|
+ + "Datanodes might not have reported blocks completely."
|
|
|
+ + " Will retry for " + retries + " times");
|
|
|
+ waitFor(4000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new IOException("Could not obtain the last block locations.");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void waitFor(int waitTime) throws InterruptedIOException {
|
|
|
+ try {
|
|
|
+ Thread.sleep(waitTime);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new InterruptedIOException(
|
|
|
+ "Interrupted while getting the last block length.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean fetchLocatedBlocks() throws IOException,
|
|
|
+ FileNotFoundException {
|
|
|
+ LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0,
|
|
|
+ prefetchSize);
|
|
|
if (newInfo == null) {
|
|
|
throw new FileNotFoundException("File does not exist: " + src);
|
|
|
}
|
|
|
|
|
|
- // I think this check is not correct. A file could have been appended to
|
|
|
- // between two calls to openInfo().
|
|
|
- if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
|
|
|
- !newInfo.isUnderConstruction()) {
|
|
|
- Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
|
|
|
+ if (locatedBlocks != null && !locatedBlocks.isUnderConstruction()
|
|
|
+ && !newInfo.isUnderConstruction()) {
|
|
|
+ Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks()
|
|
|
+ .iterator();
|
|
|
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
|
|
|
while (oldIter.hasNext() && newIter.hasNext()) {
|
|
|
- if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
|
|
+ if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
|
|
|
throw new IOException("Blocklist for " + src + " has changed!");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- updateBlockInfo(newInfo);
|
|
|
+ boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
|
|
|
this.locatedBlocks = newInfo;
|
|
|
this.currentNode = null;
|
|
|
+ return isBlkInfoUpdated;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* For files under construction, update the last block size based
|
|
|
* on the length of the block from the datanode.
|
|
|
*/
|
|
|
- private void updateBlockInfo(LocatedBlocks newInfo) {
|
|
|
+ private boolean updateBlockInfo(LocatedBlocks newInfo) throws IOException {
|
|
|
if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction()
|
|
|
|| !(newInfo.locatedBlockCount() > 0)) {
|
|
|
- return;
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1);
|
|
|
boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo
|
|
|
.getFileLength());
|
|
|
- if (!lastBlockInFile || last.getLocations().length <= 0) {
|
|
|
- return;
|
|
|
+ if (!lastBlockInFile) {
|
|
|
+ return true;
|
|
|
}
|
|
|
+
|
|
|
+ if (last.getLocations().length == 0) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
ClientDatanodeProtocol primary = null;
|
|
|
- DatanodeInfo primaryNode = last.getLocations()[0];
|
|
|
- try {
|
|
|
- primary = createClientDatanodeProtocolProxy(primaryNode, conf,
|
|
|
- last.getBlock(), last.getBlockToken(), socketTimeout,
|
|
|
- connectToDnViaHostname);
|
|
|
- Block newBlock = primary.getBlockInfo(last.getBlock());
|
|
|
- long newBlockSize = newBlock.getNumBytes();
|
|
|
- long delta = newBlockSize - last.getBlockSize();
|
|
|
- // if the size of the block on the datanode is different
|
|
|
- // from what the NN knows about, the datanode wins!
|
|
|
- last.getBlock().setNumBytes(newBlockSize);
|
|
|
- long newlength = newInfo.getFileLength() + delta;
|
|
|
- newInfo.setFileLength(newlength);
|
|
|
- LOG.debug("DFSClient setting last block " + last + " to length "
|
|
|
- + newBlockSize + " filesize is now " + newInfo.getFileLength());
|
|
|
- } catch (IOException e) {
|
|
|
- if (e.getMessage().startsWith(
|
|
|
- "java.io.IOException: java.lang.NoSuchMethodException: "
|
|
|
- + "org.apache.hadoop.hdfs.protocol"
|
|
|
- + ".ClientDatanodeProtocol.getBlockInfo")) {
|
|
|
- // We're talking to a server that doesn't implement HDFS-200.
|
|
|
- serverSupportsHdfs200 = false;
|
|
|
- } else {
|
|
|
- LOG.debug("DFSClient file " + src
|
|
|
- + " is being concurrently append to" + " but datanode "
|
|
|
- + primaryNode.getHostName() + " probably does not have block "
|
|
|
- + last.getBlock());
|
|
|
+ Block newBlock = null;
|
|
|
+ for (int i = 0; i < last.getLocations().length && newBlock == null; i++) {
|
|
|
+ DatanodeInfo datanode = last.getLocations()[i];
|
|
|
+ try {
|
|
|
+ primary = createClientDatanodeProtocolProxy(datanode, conf, last
|
|
|
+ .getBlock(), last.getBlockToken(), socketTimeout,
|
|
|
+ connectToDnViaHostname);
|
|
|
+ newBlock = primary.getBlockInfo(last.getBlock());
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (e.getMessage().startsWith(
|
|
|
+ "java.io.IOException: java.lang.NoSuchMethodException: "
|
|
|
+ + "org.apache.hadoop.hdfs.protocol"
|
|
|
+ + ".ClientDatanodeProtocol.getBlockInfo")) {
|
|
|
+ // We're talking to a server that doesn't implement HDFS-200.
|
|
|
+ serverSupportsHdfs200 = false;
|
|
|
+ } else {
|
|
|
+ LOG.info("Failed to get block info from "
|
|
|
+ + datanode.getHostName() + " probably does not have block "
|
|
|
+ + last.getBlock(), e);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (primary != null) {
|
|
|
+ RPC.stopProxy(primary);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (newBlock == null) {
|
|
|
+ if (!serverSupportsHdfs200) {
|
|
|
+ return true;
|
|
|
}
|
|
|
+ throw new IOException(
|
|
|
+ "Failed to get block info from any of the DN in pipeline: "
|
|
|
+ + Arrays.toString(last.getLocations()));
|
|
|
}
|
|
|
+
|
|
|
+ long newBlockSize = newBlock.getNumBytes();
|
|
|
+ long delta = newBlockSize - last.getBlockSize();
|
|
|
+ // if the size of the block on the datanode is different
|
|
|
+ // from what the NN knows about, the datanode wins!
|
|
|
+ last.getBlock().setNumBytes(newBlockSize);
|
|
|
+ long newlength = newInfo.getFileLength() + delta;
|
|
|
+ newInfo.setFileLength(newlength);
|
|
|
+ LOG.debug("DFSClient setting last block " + last + " to length "
|
|
|
+ + newBlockSize + " filesize is now " + newInfo.getFileLength());
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
public synchronized long getFileLength() {
|