|
@@ -34,7 +34,9 @@ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
|
|
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
|
|
+import org.apache.hadoop.security.InvalidAccessTokenException;
|
|
|
import org.apache.hadoop.security.AccessControlException;
|
|
|
+import org.apache.hadoop.security.AccessToken;
|
|
|
import org.apache.hadoop.security.UnixUserGroupInformation;
|
|
|
import org.apache.hadoop.util.*;
|
|
|
|
|
@@ -638,14 +640,21 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
|
|
|
) throws IOException {
|
|
|
//get all block locations
|
|
|
- final List<LocatedBlock> locatedblocks
|
|
|
+ List<LocatedBlock> locatedblocks
|
|
|
= callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE).getLocatedBlocks();
|
|
|
final DataOutputBuffer md5out = new DataOutputBuffer();
|
|
|
int bytesPerCRC = 0;
|
|
|
long crcPerBlock = 0;
|
|
|
+ boolean refetchBlocks = false;
|
|
|
+ int lastRetriedIndex = -1;
|
|
|
|
|
|
//get block checksum for each block
|
|
|
for(int i = 0; i < locatedblocks.size(); i++) {
|
|
|
+ if (refetchBlocks) { // refetch to get fresh tokens
|
|
|
+ locatedblocks = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE)
|
|
|
+ .getLocatedBlocks();
|
|
|
+ refetchBlocks = false;
|
|
|
+ }
|
|
|
LocatedBlock lb = locatedblocks.get(i);
|
|
|
final Block block = lb.getBlock();
|
|
|
final DatanodeInfo[] datanodes = lb.getLocations();
|
|
@@ -677,12 +686,28 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
|
|
|
out.writeLong(block.getBlockId());
|
|
|
out.writeLong(block.getGenerationStamp());
|
|
|
+ lb.getAccessToken().write(out);
|
|
|
out.flush();
|
|
|
|
|
|
final short reply = in.readShort();
|
|
|
if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
- throw new IOException("Bad response " + reply + " for block "
|
|
|
- + block + " from datanode " + datanodes[j].getName());
|
|
|
+ if (reply == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN
|
|
|
+ && i > lastRetriedIndex) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
|
|
|
+ + "for file " + src + " for block " + block
|
|
|
+ + " from datanode " + datanodes[j].getName()
|
|
|
+ + ". Will retry the block once.");
|
|
|
+ }
|
|
|
+ lastRetriedIndex = i;
|
|
|
+ done = true; // actually it's not done; but we'll retry
|
|
|
+ i--; // repeat at i-th block
|
|
|
+ refetchBlocks = true;
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ throw new IOException("Bad response " + reply + " for block "
|
|
|
+ + block + " from datanode " + datanodes[j].getName());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
//read byte-per-checksum
|
|
@@ -1325,24 +1350,26 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
checksumSize = this.checksum.getChecksumSize();
|
|
|
}
|
|
|
|
|
|
- public static BlockReader newBlockReader(Socket sock, String file, long blockId,
|
|
|
+ public static BlockReader newBlockReader(Socket sock, String file, long blockId, AccessToken accessToken,
|
|
|
long genStamp, long startOffset, long len, int bufferSize) throws IOException {
|
|
|
- return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
|
|
|
+ return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
|
|
|
true);
|
|
|
}
|
|
|
|
|
|
/** Java Doc required */
|
|
|
public static BlockReader newBlockReader( Socket sock, String file, long blockId,
|
|
|
+ AccessToken accessToken,
|
|
|
long genStamp,
|
|
|
long startOffset, long len,
|
|
|
int bufferSize, boolean verifyChecksum)
|
|
|
throws IOException {
|
|
|
- return newBlockReader(sock, file, blockId, genStamp, startOffset,
|
|
|
+ return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
|
|
|
len, bufferSize, verifyChecksum, "");
|
|
|
}
|
|
|
|
|
|
public static BlockReader newBlockReader( Socket sock, String file,
|
|
|
long blockId,
|
|
|
+ AccessToken accessToken,
|
|
|
long genStamp,
|
|
|
long startOffset, long len,
|
|
|
int bufferSize, boolean verifyChecksum,
|
|
@@ -1360,6 +1387,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
out.writeLong( startOffset );
|
|
|
out.writeLong( len );
|
|
|
Text.writeString(out, clientName);
|
|
|
+ accessToken.write(out);
|
|
|
out.flush();
|
|
|
|
|
|
//
|
|
@@ -1370,10 +1398,16 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
new BufferedInputStream(NetUtils.getInputStream(sock),
|
|
|
bufferSize));
|
|
|
|
|
|
- if ( in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS ) {
|
|
|
- throw new IOException("Got error in response to OP_READ_BLOCK " +
|
|
|
- "for file " + file +
|
|
|
- " for block " + blockId);
|
|
|
+ short status = in.readShort();
|
|
|
+ if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
|
|
|
+ if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
|
|
|
+ throw new InvalidAccessTokenException(
|
|
|
+ "Got access token error in response to OP_READ_BLOCK "
|
|
|
+ + "for file " + file + " for block " + blockId);
|
|
|
+ } else {
|
|
|
+ throw new IOException("Got error in response to OP_READ_BLOCK "
|
|
|
+ + "for file " + file + " for block " + blockId);
|
|
|
+ }
|
|
|
}
|
|
|
DataChecksum checksum = DataChecksum.newDataChecksum( in );
|
|
|
//Warning when we get CHECKSUM_NULL?
|
|
@@ -1520,7 +1554,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
* @return located block
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private LocatedBlock getBlockAt(long offset) throws IOException {
|
|
|
+ private synchronized LocatedBlock getBlockAt(long offset) throws IOException {
|
|
|
assert (locatedBlocks != null) : "locatedBlocks is null";
|
|
|
// search cached blocks first
|
|
|
int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
@@ -1540,6 +1574,32 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
return blk;
|
|
|
}
|
|
|
|
|
|
+ /** Fetch a block from namenode and cache it */
|
|
|
+ private synchronized void fetchAndCacheBlockAt(long offset) throws IOException {
|
|
|
+ int targetBlockIdx = locatedBlocks.findBlock(offset);
|
|
|
+ if (targetBlockIdx < 0) { // block is not cached
|
|
|
+ targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
|
|
|
+ }
|
|
|
+ // fetch blocks
|
|
|
+ LocatedBlocks newBlocks;
|
|
|
+ newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
|
|
|
+ if (newBlocks == null) {
|
|
|
+ throw new IOException("Could not find target position " + offset);
|
|
|
+ }
|
|
|
+ locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Fetch a block without caching */
|
|
|
+ private LocatedBlock fetchBlockAt(long offset) throws IOException {
|
|
|
+ LocatedBlocks newBlocks;
|
|
|
+ newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
|
|
|
+ if (newBlocks == null) {
|
|
|
+ throw new IOException("Could not find target position " + offset);
|
|
|
+ }
|
|
|
+ int index = newBlocks.findBlock(offset);
|
|
|
+ return newBlocks.get(index);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get blocks in the specified range.
|
|
|
* Fetch them from the namenode if not cached.
|
|
@@ -1600,18 +1660,19 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
s = null;
|
|
|
}
|
|
|
|
|
|
- //
|
|
|
- // Compute desired block
|
|
|
- //
|
|
|
- LocatedBlock targetBlock = getBlockAt(target);
|
|
|
- assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
|
|
|
- long offsetIntoBlock = target - targetBlock.getStartOffset();
|
|
|
-
|
|
|
//
|
|
|
// Connect to best DataNode for desired Block, with potential offset
|
|
|
//
|
|
|
DatanodeInfo chosenNode = null;
|
|
|
- while (s == null) {
|
|
|
+ int refetchToken = 1; // only need to get a new access token once
|
|
|
+ while (true) {
|
|
|
+ //
|
|
|
+ // Compute desired block
|
|
|
+ //
|
|
|
+ LocatedBlock targetBlock = getBlockAt(target);
|
|
|
+ assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
|
|
|
+ long offsetIntoBlock = target - targetBlock.getStartOffset();
|
|
|
+
|
|
|
DNAddrPair retval = chooseDataNode(targetBlock);
|
|
|
chosenNode = retval.info;
|
|
|
InetSocketAddress targetAddr = retval.addr;
|
|
@@ -1621,17 +1682,33 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
NetUtils.connect(s, targetAddr, socketTimeout);
|
|
|
s.setSoTimeout(socketTimeout);
|
|
|
Block blk = targetBlock.getBlock();
|
|
|
+ AccessToken accessToken = targetBlock.getAccessToken();
|
|
|
|
|
|
blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(),
|
|
|
+ accessToken,
|
|
|
blk.getGenerationStamp(),
|
|
|
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
|
|
|
buffersize, verifyChecksum, clientName);
|
|
|
return chosenNode;
|
|
|
} catch (IOException ex) {
|
|
|
- // Put chosen node into dead list, continue
|
|
|
LOG.debug("Failed to connect to " + targetAddr + ":"
|
|
|
+ StringUtils.stringifyException(ex));
|
|
|
- addToDeadNodes(chosenNode);
|
|
|
+ if (ex instanceof InvalidAccessTokenException && refetchToken-- > 0) {
|
|
|
+ /*
|
|
|
+ * Get a new access token and retry. Retry is needed in 2 cases. 1)
|
|
|
+ * When both NN and DN re-started while DFSClient holding a cached
|
|
|
+ * access token. 2) In the case that NN fails to update its
|
|
|
+ * access key at pre-set interval (by a wide margin) and
|
|
|
+ * subsequently restarts. In this case, DN re-registers itself with
|
|
|
+ * NN and receives a new access key, but DN will delete the old
|
|
|
+ * access key from its memory since it's considered expired based on
|
|
|
+ * the estimated expiration date.
|
|
|
+ */
|
|
|
+ fetchAndCacheBlockAt(target);
|
|
|
+ } else {
|
|
|
+ // Put chosen node into dead list, continue
|
|
|
+ addToDeadNodes(chosenNode);
|
|
|
+ }
|
|
|
if (s != null) {
|
|
|
try {
|
|
|
s.close();
|
|
@@ -1641,7 +1718,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
s = null;
|
|
|
}
|
|
|
}
|
|
|
- return chosenNode;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1811,6 +1887,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
Socket dn = null;
|
|
|
int numAttempts = block.getLocations().length;
|
|
|
IOException ioe = null;
|
|
|
+ int refetchToken = 1; // only need to get a new access token once
|
|
|
|
|
|
while (dn == null && numAttempts-- > 0 ) {
|
|
|
DNAddrPair retval = chooseDataNode(block);
|
|
@@ -1822,11 +1899,13 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
dn = socketFactory.createSocket();
|
|
|
NetUtils.connect(dn, targetAddr, socketTimeout);
|
|
|
dn.setSoTimeout(socketTimeout);
|
|
|
+ AccessToken accessToken = block.getAccessToken();
|
|
|
|
|
|
int len = (int) (end - start + 1);
|
|
|
|
|
|
reader = BlockReader.newBlockReader(dn, src,
|
|
|
block.getBlock().getBlockId(),
|
|
|
+ accessToken,
|
|
|
block.getBlock().getGenerationStamp(),
|
|
|
start, len, buffersize,
|
|
|
verifyChecksum, clientName);
|
|
@@ -1844,10 +1923,20 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
reportChecksumFailure(src, block.getBlock(), chosenNode);
|
|
|
} catch (IOException e) {
|
|
|
ioe = e;
|
|
|
- LOG.warn("Failed to connect to " + targetAddr +
|
|
|
- " for file " + src +
|
|
|
- " for block " + block.getBlock().getBlockId() + ":" +
|
|
|
- StringUtils.stringifyException(e));
|
|
|
+ if (e instanceof InvalidAccessTokenException && refetchToken-- > 0) {
|
|
|
+ LOG.info("Invalid access token when connecting to " + targetAddr
|
|
|
+ + " for file " + src + " for block "
|
|
|
+ + block.getBlock() + ":"
|
|
|
+ + StringUtils.stringifyException(e)
|
|
|
+ + ", get a new access token and retry...");
|
|
|
+ block = fetchBlockAt(block.getStartOffset());
|
|
|
+ numAttempts = block.getLocations().length;
|
|
|
+ continue;
|
|
|
+ } else {
|
|
|
+ LOG.warn("Failed to connect to " + targetAddr + " for file " + src
|
|
|
+ + " for block " + block.getBlock() + ":"
|
|
|
+ + StringUtils.stringifyException(e));
|
|
|
+ }
|
|
|
} finally {
|
|
|
IOUtils.closeStream(reader);
|
|
|
IOUtils.closeSocket(dn);
|
|
@@ -2081,6 +2170,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
private DataOutputStream blockStream;
|
|
|
private DataInputStream blockReplyStream;
|
|
|
private Block block;
|
|
|
+ private AccessToken accessToken;
|
|
|
final private long blockSize;
|
|
|
private DataChecksum checksum;
|
|
|
private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
|
|
@@ -2600,6 +2690,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
//
|
|
|
if (newBlock != null) {
|
|
|
block = newBlock.getBlock();
|
|
|
+ accessToken = newBlock.getAccessToken();
|
|
|
nodes = newBlock.getLocations();
|
|
|
}
|
|
|
|
|
@@ -2785,6 +2876,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
long startTime = System.currentTimeMillis();
|
|
|
lb = locateFollowingBlock(startTime);
|
|
|
block = lb.getBlock();
|
|
|
+ accessToken = lb.getAccessToken();
|
|
|
nodes = lb.getLocations();
|
|
|
|
|
|
//
|
|
@@ -2861,6 +2953,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
|
|
for (int i = 1; i < nodes.length; i++) {
|
|
|
nodes[i].write(out);
|
|
|
}
|
|
|
+ accessToken.write(out);
|
|
|
checksum.writeHeader( out );
|
|
|
out.flush();
|
|
|
|