|
@@ -19,10 +19,8 @@
|
|
|
package org.apache.hadoop.hdfs;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.Callable;
|
|
@@ -43,7 +41,6 @@ import org.apache.hadoop.fs.HdfsVolumeId;
|
|
|
import org.apache.hadoop.fs.VolumeId;
|
|
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|
|
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
|
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
|
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|
@@ -51,16 +48,20 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
|
import org.apache.hadoop.ipc.RPC;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.google.common.collect.Maps;
|
|
|
+
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Unstable
|
|
|
class BlockStorageLocationUtil {
|
|
|
|
|
|
- private static final Log LOG = LogFactory
|
|
|
+ static final Log LOG = LogFactory
|
|
|
.getLog(BlockStorageLocationUtil.class);
|
|
|
|
|
|
/**
|
|
|
* Create a list of {@link VolumeBlockLocationCallable} corresponding to a set
|
|
|
- * of datanodes and blocks.
|
|
|
+ * of datanodes and blocks. The blocks must all correspond to the same
|
|
|
+ * block pool.
|
|
|
*
|
|
|
* @param datanodeBlocks
|
|
|
* Map of datanodes to block replicas at each datanode
|
|
@@ -70,6 +71,11 @@ class BlockStorageLocationUtil {
|
|
|
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
|
|
|
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
|
|
|
int timeout, boolean connectToDnViaHostname) {
|
|
|
+
|
|
|
+ if (datanodeBlocks.isEmpty()) {
|
|
|
+ return Lists.newArrayList();
|
|
|
+ }
|
|
|
+
|
|
|
// Construct the callables, one per datanode
|
|
|
List<VolumeBlockLocationCallable> callables =
|
|
|
new ArrayList<VolumeBlockLocationCallable>();
|
|
@@ -78,17 +84,32 @@ class BlockStorageLocationUtil {
|
|
|
// Construct RPC parameters
|
|
|
DatanodeInfo datanode = entry.getKey();
|
|
|
List<LocatedBlock> locatedBlocks = entry.getValue();
|
|
|
- List<ExtendedBlock> extendedBlocks =
|
|
|
- new ArrayList<ExtendedBlock>(locatedBlocks.size());
|
|
|
+ if (locatedBlocks.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Ensure that the blocks all are from the same block pool.
|
|
|
+ String poolId = locatedBlocks.get(0).getBlock().getBlockPoolId();
|
|
|
+ for (LocatedBlock lb : locatedBlocks) {
|
|
|
+ if (!poolId.equals(lb.getBlock().getBlockPoolId())) {
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "All blocks to be queried must be in the same block pool: " +
|
|
|
+ locatedBlocks.get(0).getBlock() + " and " + lb +
|
|
|
+ " are from different pools.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ long[] blockIds = new long[locatedBlocks.size()];
|
|
|
+ int i = 0;
|
|
|
List<Token<BlockTokenIdentifier>> dnTokens =
|
|
|
new ArrayList<Token<BlockTokenIdentifier>>(
|
|
|
locatedBlocks.size());
|
|
|
for (LocatedBlock b : locatedBlocks) {
|
|
|
- extendedBlocks.add(b.getBlock());
|
|
|
+ blockIds[i++] = b.getBlock().getBlockId();
|
|
|
dnTokens.add(b.getBlockToken());
|
|
|
}
|
|
|
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
|
|
|
- conf, datanode, extendedBlocks, dnTokens, timeout,
|
|
|
+ conf, datanode, poolId, blockIds, dnTokens, timeout,
|
|
|
connectToDnViaHostname);
|
|
|
callables.add(callable);
|
|
|
}
|
|
@@ -102,18 +123,17 @@ class BlockStorageLocationUtil {
|
|
|
*
|
|
|
* @param datanodeBlocks
|
|
|
* Map of datanodes to the blocks present on the DN
|
|
|
- * @return metadatas List of block metadata for each datanode, specifying
|
|
|
- * volume locations for each block
|
|
|
+ * @return metadatas Map of datanodes to block metadata of the DN
|
|
|
* @throws InvalidBlockTokenException
|
|
|
* if client does not have read access on a requested block
|
|
|
*/
|
|
|
- static List<HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
|
|
|
+ static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
|
|
|
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
|
|
|
- int poolsize, int timeout, boolean connectToDnViaHostname)
|
|
|
+ int poolsize, int timeoutMs, boolean connectToDnViaHostname)
|
|
|
throws InvalidBlockTokenException {
|
|
|
|
|
|
List<VolumeBlockLocationCallable> callables =
|
|
|
- createVolumeBlockLocationCallables(conf, datanodeBlocks, timeout,
|
|
|
+ createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
|
|
|
connectToDnViaHostname);
|
|
|
|
|
|
// Use a thread pool to execute the Callables in parallel
|
|
@@ -121,27 +141,24 @@ class BlockStorageLocationUtil {
|
|
|
new ArrayList<Future<HdfsBlocksMetadata>>();
|
|
|
ExecutorService executor = new ScheduledThreadPoolExecutor(poolsize);
|
|
|
try {
|
|
|
- futures = executor.invokeAll(callables, timeout, TimeUnit.SECONDS);
|
|
|
+ futures = executor.invokeAll(callables, timeoutMs,
|
|
|
+ TimeUnit.MILLISECONDS);
|
|
|
} catch (InterruptedException e) {
|
|
|
// Swallow the exception here, because we can return partial results
|
|
|
}
|
|
|
executor.shutdown();
|
|
|
|
|
|
- // Initialize metadatas list with nulls
|
|
|
- // This is used to later indicate if we didn't get a response from a DN
|
|
|
- List<HdfsBlocksMetadata> metadatas = new ArrayList<HdfsBlocksMetadata>();
|
|
|
- for (int i = 0; i < futures.size(); i++) {
|
|
|
- metadatas.add(null);
|
|
|
- }
|
|
|
+ Map<DatanodeInfo, HdfsBlocksMetadata> metadatas =
|
|
|
+ Maps.newHashMapWithExpectedSize(datanodeBlocks.size());
|
|
|
// Fill in metadatas with results from DN RPCs, where possible
|
|
|
for (int i = 0; i < futures.size(); i++) {
|
|
|
+ VolumeBlockLocationCallable callable = callables.get(i);
|
|
|
+ DatanodeInfo datanode = callable.getDatanodeInfo();
|
|
|
Future<HdfsBlocksMetadata> future = futures.get(i);
|
|
|
try {
|
|
|
HdfsBlocksMetadata metadata = future.get();
|
|
|
- metadatas.set(i, metadata);
|
|
|
+ metadatas.put(callable.getDatanodeInfo(), metadata);
|
|
|
} catch (ExecutionException e) {
|
|
|
- VolumeBlockLocationCallable callable = callables.get(i);
|
|
|
- DatanodeInfo datanode = callable.getDatanodeInfo();
|
|
|
Throwable t = e.getCause();
|
|
|
if (t instanceof InvalidBlockTokenException) {
|
|
|
LOG.warn("Invalid access token when trying to retrieve "
|
|
@@ -153,8 +170,8 @@ class BlockStorageLocationUtil {
|
|
|
+ " required #getHdfsBlocksMetadata() API");
|
|
|
throw (UnsupportedOperationException) t;
|
|
|
} else {
|
|
|
- LOG.info("Failed to connect to datanode " +
|
|
|
- datanode.getIpcAddr(false));
|
|
|
+ LOG.info("Failed to query block locations on datanode " +
|
|
|
+ datanode.getIpcAddr(false) + ": " + t);
|
|
|
}
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Could not fetch information from datanode", t);
|
|
@@ -175,23 +192,21 @@ class BlockStorageLocationUtil {
|
|
|
*
|
|
|
* @param blocks
|
|
|
* Original LocatedBlock array
|
|
|
- * @param datanodeBlocks
|
|
|
- * Mapping from datanodes to the list of replicas on each datanode
|
|
|
* @param metadatas
|
|
|
* VolumeId information for the replicas on each datanode
|
|
|
* @return blockVolumeIds per-replica VolumeId information associated with the
|
|
|
* parent LocatedBlock
|
|
|
*/
|
|
|
static Map<LocatedBlock, List<VolumeId>> associateVolumeIdsWithBlocks(
|
|
|
- List<LocatedBlock> blocks, Map<DatanodeInfo,
|
|
|
- List<LocatedBlock>> datanodeBlocks, List<HdfsBlocksMetadata> metadatas) {
|
|
|
+ List<LocatedBlock> blocks,
|
|
|
+ Map<DatanodeInfo, HdfsBlocksMetadata> metadatas) {
|
|
|
|
|
|
// Initialize mapping of ExtendedBlock to LocatedBlock.
|
|
|
// Used to associate results from DN RPCs to the parent LocatedBlock
|
|
|
- Map<ExtendedBlock, LocatedBlock> extBlockToLocBlock =
|
|
|
- new HashMap<ExtendedBlock, LocatedBlock>();
|
|
|
+ Map<Long, LocatedBlock> blockIdToLocBlock =
|
|
|
+ new HashMap<Long, LocatedBlock>();
|
|
|
for (LocatedBlock b : blocks) {
|
|
|
- extBlockToLocBlock.put(b.getBlock(), b);
|
|
|
+ blockIdToLocBlock.put(b.getBlock().getBlockId(), b);
|
|
|
}
|
|
|
|
|
|
// Initialize the mapping of blocks -> list of VolumeIds, one per replica
|
|
@@ -200,9 +215,8 @@ class BlockStorageLocationUtil {
|
|
|
new HashMap<LocatedBlock, List<VolumeId>>();
|
|
|
for (LocatedBlock b : blocks) {
|
|
|
ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
|
|
|
- // Start off all IDs as invalid, fill it in later with results from RPCs
|
|
|
for (int i = 0; i < b.getLocations().length; i++) {
|
|
|
- l.add(VolumeId.INVALID_VOLUME_ID);
|
|
|
+ l.add(null);
|
|
|
}
|
|
|
blockVolumeIds.put(b, l);
|
|
|
}
|
|
@@ -210,27 +224,28 @@ class BlockStorageLocationUtil {
|
|
|
// Iterate through the list of metadatas (one per datanode).
|
|
|
// For each metadata, if it's valid, insert its volume location information
|
|
|
// into the Map returned to the caller
|
|
|
- Iterator<HdfsBlocksMetadata> metadatasIter = metadatas.iterator();
|
|
|
- Iterator<DatanodeInfo> datanodeIter = datanodeBlocks.keySet().iterator();
|
|
|
- while (metadatasIter.hasNext()) {
|
|
|
- HdfsBlocksMetadata metadata = metadatasIter.next();
|
|
|
- DatanodeInfo datanode = datanodeIter.next();
|
|
|
+ for (Map.Entry<DatanodeInfo, HdfsBlocksMetadata> entry : metadatas.entrySet()) {
|
|
|
+ DatanodeInfo datanode = entry.getKey();
|
|
|
+ HdfsBlocksMetadata metadata = entry.getValue();
|
|
|
// Check if metadata is valid
|
|
|
if (metadata == null) {
|
|
|
continue;
|
|
|
}
|
|
|
- ExtendedBlock[] metaBlocks = metadata.getBlocks();
|
|
|
+ long[] metaBlockIds = metadata.getBlockIds();
|
|
|
List<byte[]> metaVolumeIds = metadata.getVolumeIds();
|
|
|
List<Integer> metaVolumeIndexes = metadata.getVolumeIndexes();
|
|
|
// Add VolumeId for each replica in the HdfsBlocksMetadata
|
|
|
- for (int j = 0; j < metaBlocks.length; j++) {
|
|
|
+ for (int j = 0; j < metaBlockIds.length; j++) {
|
|
|
int volumeIndex = metaVolumeIndexes.get(j);
|
|
|
- ExtendedBlock extBlock = metaBlocks[j];
|
|
|
+ long blockId = metaBlockIds[j];
|
|
|
// Skip if block wasn't found, or not a valid index into metaVolumeIds
|
|
|
// Also skip if the DN responded with a block we didn't ask for
|
|
|
if (volumeIndex == Integer.MAX_VALUE
|
|
|
|| volumeIndex >= metaVolumeIds.size()
|
|
|
- || !extBlockToLocBlock.containsKey(extBlock)) {
|
|
|
+ || !blockIdToLocBlock.containsKey(blockId)) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("No data for block " + blockId);
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
// Get the VolumeId by indexing into the list of VolumeIds
|
|
@@ -238,7 +253,7 @@ class BlockStorageLocationUtil {
|
|
|
byte[] volumeId = metaVolumeIds.get(volumeIndex);
|
|
|
HdfsVolumeId id = new HdfsVolumeId(volumeId);
|
|
|
// Find out which index we are in the LocatedBlock's replicas
|
|
|
- LocatedBlock locBlock = extBlockToLocBlock.get(extBlock);
|
|
|
+ LocatedBlock locBlock = blockIdToLocBlock.get(blockId);
|
|
|
DatanodeInfo[] dnInfos = locBlock.getLocations();
|
|
|
int index = -1;
|
|
|
for (int k = 0; k < dnInfos.length; k++) {
|
|
@@ -292,21 +307,23 @@ class BlockStorageLocationUtil {
|
|
|
private static class VolumeBlockLocationCallable implements
|
|
|
Callable<HdfsBlocksMetadata> {
|
|
|
|
|
|
- private Configuration configuration;
|
|
|
- private int timeout;
|
|
|
- private DatanodeInfo datanode;
|
|
|
- private List<ExtendedBlock> extendedBlocks;
|
|
|
- private List<Token<BlockTokenIdentifier>> dnTokens;
|
|
|
- private boolean connectToDnViaHostname;
|
|
|
+ private final Configuration configuration;
|
|
|
+ private final int timeout;
|
|
|
+ private final DatanodeInfo datanode;
|
|
|
+ private final String poolId;
|
|
|
+ private final long[] blockIds;
|
|
|
+ private final List<Token<BlockTokenIdentifier>> dnTokens;
|
|
|
+ private final boolean connectToDnViaHostname;
|
|
|
|
|
|
VolumeBlockLocationCallable(Configuration configuration,
|
|
|
- DatanodeInfo datanode, List<ExtendedBlock> extendedBlocks,
|
|
|
+ DatanodeInfo datanode, String poolId, long []blockIds,
|
|
|
List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
|
|
|
boolean connectToDnViaHostname) {
|
|
|
this.configuration = configuration;
|
|
|
this.timeout = timeout;
|
|
|
this.datanode = datanode;
|
|
|
- this.extendedBlocks = extendedBlocks;
|
|
|
+ this.poolId = poolId;
|
|
|
+ this.blockIds = blockIds;
|
|
|
this.dnTokens = dnTokens;
|
|
|
this.connectToDnViaHostname = connectToDnViaHostname;
|
|
|
}
|
|
@@ -323,7 +340,7 @@ class BlockStorageLocationUtil {
|
|
|
try {
|
|
|
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
|
|
|
timeout, connectToDnViaHostname);
|
|
|
- metadata = cdp.getHdfsBlocksMetadata(extendedBlocks, dnTokens);
|
|
|
+ metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
|
|
|
} catch (IOException e) {
|
|
|
// Bubble this up to the caller, handle with the Future
|
|
|
throw e;
|