|
@@ -0,0 +1,589 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+package org.apache.hadoop.mapred.lib;
|
|
|
|
+
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.Set;
|
|
|
|
+import java.util.Iterator;
|
|
|
|
+import java.util.Map;
|
|
|
|
+import java.util.Map.Entry;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
|
+import org.apache.hadoop.fs.FileUtil;
|
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
|
+import org.apache.hadoop.fs.BlockLocation;
|
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
|
+import org.apache.hadoop.fs.PathFilter;
|
|
|
|
+import org.apache.hadoop.net.NodeBase;
|
|
|
|
+import org.apache.hadoop.net.NetworkTopology;
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.mapred.InputSplit;
|
|
|
|
+import org.apache.hadoop.mapred.FileInputFormat;
|
|
|
|
+import org.apache.hadoop.mapred.JobConf;
|
|
|
|
+import org.apache.hadoop.mapred.Reporter;
|
|
|
|
+import org.apache.hadoop.mapred.RecordReader;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
|
|
|
|
+ * in {@link org.apache.hadoop.mapred.InputFormat#getSplits(JobConf, int)} method.
|
|
|
|
+ * Splits are constructed from the files under the input paths.
|
|
|
|
+ * A split cannot have files from different pools.
|
|
|
|
+ * Each split returned may contain blocks from different files.
|
|
|
|
+ * If a maxSplitSize is specified, then blocks on the same node are
|
|
|
|
+ * combined to form a single split. Blocks that are left over are
|
|
|
|
+ * then combined with other blocks in the same rack.
|
|
|
|
+ * If maxSplitSize is not specified, then blocks from the same rack
|
|
|
|
+ * are combined in a single split; no attempt is made to create
|
|
|
|
+ * node-local splits.
|
|
|
|
+ * If the maxSplitSize is equal to the block size, then this class
|
|
|
|
+ * is similar to the default spliting behaviour in Hadoop: each
|
|
|
|
+ * block is a locally processed split.
|
|
|
|
+ * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
|
|
|
|
+ * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
|
|
|
|
+ * @see CombineFileSplit
|
|
|
|
+ */
|
|
|
|
+public abstract class CombineFileInputFormat<K, V>
|
|
|
|
+ extends FileInputFormat<K, V> {
|
|
|
|
+
|
|
|
|
+ // ability to limit the size of a single split
|
|
|
|
+ private long maxSplitSize = 0;
|
|
|
|
+ private long minSplitSizeNode = 0;
|
|
|
|
+ private long minSplitSizeRack = 0;
|
|
|
|
+
|
|
|
|
+ // A pool of input paths filters. A split cannot have blocks from files
|
|
|
|
+ // across multiple pools.
|
|
|
|
+ private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Specify the maximum size (in bytes) of each split. Each split is
|
|
|
|
+ * approximately equal to the specified size.
|
|
|
|
+ */
|
|
|
|
+ protected void setMaxSplitSize(long maxSplitSize) {
|
|
|
|
+ this.maxSplitSize = maxSplitSize;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Specify the minimum size (in bytes) of each split per node.
|
|
|
|
+ * This applies to data that is left over after combining data on a single
|
|
|
|
+ * node into splits that are of maximum size specified by maxSplitSize.
|
|
|
|
+ * This leftover data will be combined into its own split if its size
|
|
|
|
+ * exceeds minSplitSizeNode.
|
|
|
|
+ */
|
|
|
|
+ protected void setMinSplitSizeNode(long minSplitSizeNode) {
|
|
|
|
+ this.minSplitSizeNode = minSplitSizeNode;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Specify the minimum size (in bytes) of each split per rack.
|
|
|
|
+ * This applies to data that is left over after combining data on a single
|
|
|
|
+ * rack into splits that are of maximum size specified by maxSplitSize.
|
|
|
|
+ * This leftover data will be combined into its own split if its size
|
|
|
|
+ * exceeds minSplitSizeRack.
|
|
|
|
+ */
|
|
|
|
+ protected void setMinSplitSizeRack(long minSplitSizeRack) {
|
|
|
|
+ this.minSplitSizeRack = minSplitSizeRack;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create a new pool and add the filters to it.
|
|
|
|
+ * A split cannot have files from different pools.
|
|
|
|
+ */
|
|
|
|
+ protected void createPool(JobConf conf, List<PathFilter> filters) {
|
|
|
|
+ pools.add(new MultiPathFilter(filters));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create a new pool and add the filters to it.
|
|
|
|
+ * A pathname can satisfy any one of the specified filters.
|
|
|
|
+ * A split cannot have files from different pools.
|
|
|
|
+ */
|
|
|
|
+ protected void createPool(JobConf conf, PathFilter... filters) {
|
|
|
|
+ MultiPathFilter multi = new MultiPathFilter();
|
|
|
|
+ for (PathFilter f: filters) {
|
|
|
|
+ multi.add(f);
|
|
|
|
+ }
|
|
|
|
+ pools.add(multi);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * default constructor
|
|
|
|
+ */
|
|
|
|
+ public CombineFileInputFormat() {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public InputSplit[] getSplits(JobConf job, int numSplits)
|
|
|
|
+ throws IOException {
|
|
|
|
+
|
|
|
|
+ long minSizeNode = 0;
|
|
|
|
+ long minSizeRack = 0;
|
|
|
|
+ long maxSize = 0;
|
|
|
|
+
|
|
|
|
+ // the values specified by setxxxSplitSize() takes precedence over the
|
|
|
|
+ // values that might have been specified in the config
|
|
|
|
+ if (minSplitSizeNode != 0) {
|
|
|
|
+ minSizeNode = minSplitSizeNode;
|
|
|
|
+ } else {
|
|
|
|
+ minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
|
|
|
|
+ }
|
|
|
|
+ if (minSplitSizeRack != 0) {
|
|
|
|
+ minSizeRack = minSplitSizeRack;
|
|
|
|
+ } else {
|
|
|
|
+ minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
|
|
|
|
+ }
|
|
|
|
+ if (maxSplitSize != 0) {
|
|
|
|
+ maxSize = maxSplitSize;
|
|
|
|
+ } else {
|
|
|
|
+ maxSize = job.getLong("mapred.max.split.size", 0);
|
|
|
|
+ }
|
|
|
|
+ if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
|
|
|
|
+ throw new IOException("Minimum split size pernode " + minSizeNode +
|
|
|
|
+ " cannot be larger than maximum split size " +
|
|
|
|
+ maxSize);
|
|
|
|
+ }
|
|
|
|
+ if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
|
|
|
|
+ throw new IOException("Minimum split size per rack" + minSizeRack +
|
|
|
|
+ " cannot be larger than maximum split size " +
|
|
|
|
+ maxSize);
|
|
|
|
+ }
|
|
|
|
+ if (minSizeRack != 0 && minSizeNode > minSizeRack) {
|
|
|
|
+ throw new IOException("Minimum split size per node" + minSizeNode +
|
|
|
|
+ " cannot be smaller than minimum split size per rack " +
|
|
|
|
+ minSizeRack);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // all the files in input set
|
|
|
|
+ Path[] paths = FileUtil.stat2Paths(listStatus(job));
|
|
|
|
+ List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
|
|
|
|
+ if (paths.length == 0) {
|
|
|
|
+ return splits.toArray(new CombineFileSplit[splits.size()]);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // In one single iteration, process all the paths in a single pool.
|
|
|
|
+ // Processing one pool at a time ensures that a split contans paths
|
|
|
|
+ // from a single pool only.
|
|
|
|
+ for (MultiPathFilter onepool : pools) {
|
|
|
|
+ ArrayList<Path> myPaths = new ArrayList<Path>();
|
|
|
|
+
|
|
|
|
+ // pick one input path. If it matches all the filters in a pool,
|
|
|
|
+ // add it to the output set
|
|
|
|
+ for (int i = 0; i < paths.length; i++) {
|
|
|
|
+ if (paths[i] == null) { // already processed
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ FileSystem fs = paths[i].getFileSystem(job);
|
|
|
|
+ Path p = new Path(paths[i].toUri().getPath());
|
|
|
|
+ if (onepool.accept(p)) {
|
|
|
|
+ myPaths.add(paths[i]); // add it to my output set
|
|
|
|
+ paths[i] = null; // already processed
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // create splits for all files in this pool.
|
|
|
|
+ getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
|
|
|
|
+ maxSize, minSizeNode, minSizeRack, splits);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Finally, process all paths that do not belong to any pool.
|
|
|
|
+ ArrayList<Path> myPaths = new ArrayList<Path>();
|
|
|
|
+ for (int i = 0; i < paths.length; i++) {
|
|
|
|
+ if (paths[i] == null) { // already processed
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ myPaths.add(paths[i]);
|
|
|
|
+ }
|
|
|
|
+ // create splits for all files that are not in any pool.
|
|
|
|
+ getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
|
|
|
|
+ maxSize, minSizeNode, minSizeRack, splits);
|
|
|
|
+
|
|
|
|
+ return splits.toArray(new CombineFileSplit[splits.size()]);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Return all the splits in the specified set of paths
|
|
|
|
+ */
|
|
|
|
+ private void getMoreSplits(JobConf job, Path[] paths,
|
|
|
|
+ long maxSize, long minSizeNode, long minSizeRack,
|
|
|
|
+ List<CombineFileSplit> splits)
|
|
|
|
+ throws IOException {
|
|
|
|
+
|
|
|
|
+ // all blocks for all the files in input set
|
|
|
|
+ OneFileInfo[] files;
|
|
|
|
+
|
|
|
|
+ // mapping from a rack name to the list of blocks it has
|
|
|
|
+ HashMap<String, List<OneBlockInfo>> rackToBlocks =
|
|
|
|
+ new HashMap<String, List<OneBlockInfo>>();
|
|
|
|
+
|
|
|
|
+ // mapping from a block to the nodes on which it has replicas
|
|
|
|
+ HashMap<OneBlockInfo, String[]> blockToNodes =
|
|
|
|
+ new HashMap<OneBlockInfo, String[]>();
|
|
|
|
+
|
|
|
|
+ // mapping from a node to the list of blocks that it contains
|
|
|
|
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks =
|
|
|
|
+ new HashMap<String, List<OneBlockInfo>>();
|
|
|
|
+
|
|
|
|
+ files = new OneFileInfo[paths.length];
|
|
|
|
+ if (paths.length == 0) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // populate all the blocks for all files
|
|
|
|
+ long totLength = 0;
|
|
|
|
+ for (int i = 0; i < paths.length; i++) {
|
|
|
|
+ files[i] = new OneFileInfo(paths[i], job,
|
|
|
|
+ rackToBlocks, blockToNodes, nodeToBlocks);
|
|
|
|
+ totLength += files[i].getLength();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
|
|
|
|
+ ArrayList<String> nodes = new ArrayList<String>();
|
|
|
|
+ long curSplitSize = 0;
|
|
|
|
+
|
|
|
|
+ // process all nodes and create splits that are local
|
|
|
|
+ // to a node.
|
|
|
|
+ for (Iterator<Map.Entry<String,
|
|
|
|
+ List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
|
|
|
|
+ iter.hasNext();) {
|
|
|
|
+
|
|
|
|
+ Map.Entry<String, List<OneBlockInfo>> one = iter.next();
|
|
|
|
+ nodes.add(one.getKey());
|
|
|
|
+ List<OneBlockInfo> blocksInNode = one.getValue();
|
|
|
|
+
|
|
|
|
+ // for each block, copy it into validBlocks. Delete it from
|
|
|
|
+ // blockToNodes so that the same block does not appear in
|
|
|
|
+ // two different splits.
|
|
|
|
+ for (OneBlockInfo oneblock : blocksInNode) {
|
|
|
|
+ if (blockToNodes.containsKey(oneblock)) {
|
|
|
|
+ validBlocks.add(oneblock);
|
|
|
|
+ blockToNodes.remove(oneblock);
|
|
|
|
+ curSplitSize += oneblock.length;
|
|
|
|
+
|
|
|
|
+ // if the accumulated split size exceeds the maximum, then
|
|
|
|
+ // create this split.
|
|
|
|
+ if (maxSize != 0 && curSplitSize >= maxSize) {
|
|
|
|
+ // create an input split and add it to the splits array
|
|
|
|
+ addCreatedSplit(job, splits, nodes, validBlocks);
|
|
|
|
+ curSplitSize = 0;
|
|
|
|
+ validBlocks.clear();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // if there were any blocks left over and their combined size is
|
|
|
|
+ // larger than minSplitNode, then combine them into one split.
|
|
|
|
+ // Otherwise add them back to the unprocessed pool. It is likely
|
|
|
|
+ // that they will be combined with other blocks from the same rack later on.
|
|
|
|
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
|
|
|
|
+ // create an input split and add it to the splits array
|
|
|
|
+ addCreatedSplit(job, splits, nodes, validBlocks);
|
|
|
|
+ } else {
|
|
|
|
+ for (OneBlockInfo oneblock : validBlocks) {
|
|
|
|
+ blockToNodes.put(oneblock, oneblock.hosts);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ validBlocks.clear();
|
|
|
|
+ nodes.clear();
|
|
|
|
+ curSplitSize = 0;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // if blocks in a rack are below the specified minimum size, then keep them
|
|
|
|
+ // in 'overflow'. After the processing of all racks is complete, these overflow
|
|
|
|
+ // blocks will be combined into splits.
|
|
|
|
+ ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
|
|
|
|
+ ArrayList<String> racks = new ArrayList<String>();
|
|
|
|
+
|
|
|
|
+ // Process all racks over and over again until there is no more work to do.
|
|
|
|
+ while (blockToNodes.size() > 0) {
|
|
|
|
+
|
|
|
|
+ // Create one split for this rack before moving over to the next rack.
|
|
|
|
+ // Come back to this rack after creating a single split for each of the
|
|
|
|
+ // remaining racks.
|
|
|
|
+ // Process one rack location at a time, Combine all possible blocks that
|
|
|
|
+ // reside on this rack as one split. (constrained by minimum and maximum
|
|
|
|
+ // split size).
|
|
|
|
+
|
|
|
|
+ // iterate over all racks
|
|
|
|
+ for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
|
|
|
|
+ rackToBlocks.entrySet().iterator(); iter.hasNext();) {
|
|
|
|
+
|
|
|
|
+ Map.Entry<String, List<OneBlockInfo>> one = iter.next();
|
|
|
|
+ racks.add(one.getKey());
|
|
|
|
+ List<OneBlockInfo> blocks = one.getValue();
|
|
|
|
+
|
|
|
|
+ // for each block, copy it into validBlocks. Delete it from
|
|
|
|
+ // blockToNodes so that the same block does not appear in
|
|
|
|
+ // two different splits.
|
|
|
|
+ boolean createdSplit = false;
|
|
|
|
+ for (OneBlockInfo oneblock : blocks) {
|
|
|
|
+ if (blockToNodes.containsKey(oneblock)) {
|
|
|
|
+ validBlocks.add(oneblock);
|
|
|
|
+ blockToNodes.remove(oneblock);
|
|
|
|
+ curSplitSize += oneblock.length;
|
|
|
|
+
|
|
|
|
+ // if the accumulated split size exceeds the maximum, then
|
|
|
|
+ // create this split.
|
|
|
|
+ if (maxSize != 0 && curSplitSize >= maxSize) {
|
|
|
|
+ // create an input split and add it to the splits array
|
|
|
|
+ addCreatedSplit(job, splits, racks, validBlocks);
|
|
|
|
+ createdSplit = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // if we created a split, then just go to the next rack
|
|
|
|
+ if (createdSplit) {
|
|
|
|
+ curSplitSize = 0;
|
|
|
|
+ validBlocks.clear();
|
|
|
|
+ racks.clear();
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!validBlocks.isEmpty()) {
|
|
|
|
+ if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
|
|
|
|
+ // if there is a mimimum size specified, then create a single split
|
|
|
|
+ // otherwise, store these blocks into overflow data structure
|
|
|
|
+ addCreatedSplit(job, splits, racks, validBlocks);
|
|
|
|
+ } else {
|
|
|
|
+ // There were a few blocks in this rack that remained to be processed.
|
|
|
|
+ // Keep them in 'overflow' block list. These will be combined later.
|
|
|
|
+ overflowBlocks.addAll(validBlocks);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ curSplitSize = 0;
|
|
|
|
+ validBlocks.clear();
|
|
|
|
+ racks.clear();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ assert blockToNodes.isEmpty();
|
|
|
|
+ assert curSplitSize == 0;
|
|
|
|
+ assert validBlocks.isEmpty();
|
|
|
|
+ assert racks.isEmpty();
|
|
|
|
+
|
|
|
|
+ // Process all overflow blocks
|
|
|
|
+ for (OneBlockInfo oneblock : overflowBlocks) {
|
|
|
|
+ validBlocks.add(oneblock);
|
|
|
|
+ curSplitSize += oneblock.length;
|
|
|
|
+
|
|
|
|
+ // This might cause an exiting rack location to be re-added,
|
|
|
|
+ // but it should be ok.
|
|
|
|
+ for (int i = 0; i < oneblock.racks.length; i++) {
|
|
|
|
+ racks.add(oneblock.racks[i]);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // if the accumulated split size exceeds the maximum, then
|
|
|
|
+ // create this split.
|
|
|
|
+ if (maxSize != 0 && curSplitSize >= maxSize) {
|
|
|
|
+ // create an input split and add it to the splits array
|
|
|
|
+ addCreatedSplit(job, splits, racks, validBlocks);
|
|
|
|
+ curSplitSize = 0;
|
|
|
|
+ validBlocks.clear();
|
|
|
|
+ racks.clear();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Process any remaining blocks, if any.
|
|
|
|
+ if (!validBlocks.isEmpty()) {
|
|
|
|
+ addCreatedSplit(job, splits, racks, validBlocks);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Create a single split from the list of blocks specified in validBlocks
|
|
|
|
+ * Add this new split into splitList.
|
|
|
|
+ */
|
|
|
|
+ private void addCreatedSplit(JobConf job,
|
|
|
|
+ List<CombineFileSplit> splitList,
|
|
|
|
+ List<String> racks,
|
|
|
|
+ ArrayList<OneBlockInfo> validBlocks) {
|
|
|
|
+ // create an input split
|
|
|
|
+ Path[] fl = new Path[validBlocks.size()];
|
|
|
|
+ long[] offset = new long[validBlocks.size()];
|
|
|
|
+ long[] length = new long[validBlocks.size()];
|
|
|
|
+ String[] rackLocations = racks.toArray(new String[racks.size()]);
|
|
|
|
+ for (int i = 0; i < validBlocks.size(); i++) {
|
|
|
|
+ fl[i] = validBlocks.get(i).onepath;
|
|
|
|
+ offset[i] = validBlocks.get(i).offset;
|
|
|
|
+ length[i] = validBlocks.get(i).length;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // add this split to the list that is returned
|
|
|
|
+ CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset,
|
|
|
|
+ length, rackLocations);
|
|
|
|
+ splitList.add(thissplit);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * This is not implemented yet.
|
|
|
|
+ */
|
|
|
|
+ public abstract RecordReader<K, V> getRecordReader(InputSplit split,
|
|
|
|
+ JobConf job, Reporter reporter)
|
|
|
|
+ throws IOException;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * information about one file from the File System
|
|
|
|
+ */
|
|
|
|
+ private static class OneFileInfo {
|
|
|
|
+ private long fileSize; // size of the file
|
|
|
|
+ private OneBlockInfo[] blocks; // all blocks in this file
|
|
|
|
+
|
|
|
|
+ OneFileInfo(Path path, JobConf job,
|
|
|
|
+ HashMap<String, List<OneBlockInfo>> rackToBlocks,
|
|
|
|
+ HashMap<OneBlockInfo, String[]> blockToNodes,
|
|
|
|
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks)
|
|
|
|
+ throws IOException {
|
|
|
|
+ this.fileSize = 0;
|
|
|
|
+
|
|
|
|
+ // get block locations from file system
|
|
|
|
+ FileSystem fs = path.getFileSystem(job);
|
|
|
|
+ FileStatus stat = fs.getFileStatus(path);
|
|
|
|
+ BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
|
|
|
|
+ stat.getLen());
|
|
|
|
+ // create a list of all block and their locations
|
|
|
|
+ if (locations == null) {
|
|
|
|
+ blocks = new OneBlockInfo[0];
|
|
|
|
+ } else {
|
|
|
|
+ blocks = new OneBlockInfo[locations.length];
|
|
|
|
+ for (int i = 0; i < locations.length; i++) {
|
|
|
|
+
|
|
|
|
+ fileSize += locations[i].getLength();
|
|
|
|
+ OneBlockInfo oneblock = new OneBlockInfo(path,
|
|
|
|
+ locations[i].getOffset(),
|
|
|
|
+ locations[i].getLength(),
|
|
|
|
+ locations[i].getHosts(),
|
|
|
|
+ locations[i].getTopologyPaths());
|
|
|
|
+ blocks[i] = oneblock;
|
|
|
|
+
|
|
|
|
+ // add this block to the block --> node locations map
|
|
|
|
+ blockToNodes.put(oneblock, oneblock.hosts);
|
|
|
|
+
|
|
|
|
+ // add this block to the rack --> block map
|
|
|
|
+ for (int j = 0; j < oneblock.racks.length; j++) {
|
|
|
|
+ String rack = oneblock.racks[j];
|
|
|
|
+ List<OneBlockInfo> blklist = rackToBlocks.get(rack);
|
|
|
|
+ if (blklist == null) {
|
|
|
|
+ blklist = new ArrayList<OneBlockInfo>();
|
|
|
|
+ rackToBlocks.put(rack, blklist);
|
|
|
|
+ }
|
|
|
|
+ blklist.add(oneblock);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // add this block to the node --> block map
|
|
|
|
+ for (int j = 0; j < oneblock.hosts.length; j++) {
|
|
|
|
+ String node = oneblock.hosts[j];
|
|
|
|
+ List<OneBlockInfo> blklist = nodeToBlocks.get(node);
|
|
|
|
+ if (blklist == null) {
|
|
|
|
+ blklist = new ArrayList<OneBlockInfo>();
|
|
|
|
+ nodeToBlocks.put(node, blklist);
|
|
|
|
+ }
|
|
|
|
+ blklist.add(oneblock);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ long getLength() {
|
|
|
|
+ return fileSize;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ OneBlockInfo[] getBlocks() {
|
|
|
|
+ return blocks;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * information about one block from the File System
|
|
|
|
+ */
|
|
|
|
+ private static class OneBlockInfo {
|
|
|
|
+ Path onepath; // name of this file
|
|
|
|
+ long offset; // offset in file
|
|
|
|
+ long length; // length of this block
|
|
|
|
+ String[] hosts; // nodes on whch this block resides
|
|
|
|
+ String[] racks; // network topology of hosts
|
|
|
|
+
|
|
|
|
+ OneBlockInfo(Path path, long offset, long len,
|
|
|
|
+ String[] hosts, String[] topologyPaths) {
|
|
|
|
+ this.onepath = path;
|
|
|
|
+ this.offset = offset;
|
|
|
|
+ this.hosts = hosts;
|
|
|
|
+ this.length = len;
|
|
|
|
+ assert (hosts.length == topologyPaths.length ||
|
|
|
|
+ topologyPaths.length == 0);
|
|
|
|
+
|
|
|
|
+ // if the file ystem does not have any rack information, then
|
|
|
|
+ // use dummy rack location.
|
|
|
|
+ if (topologyPaths.length == 0) {
|
|
|
|
+ topologyPaths = new String[hosts.length];
|
|
|
|
+ for (int i = 0; i < topologyPaths.length; i++) {
|
|
|
|
+ topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
|
|
|
|
+ toString();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // The topology paths have the host name included as the last
|
|
|
|
+ // component. Strip it.
|
|
|
|
+ this.racks = new String[topologyPaths.length];
|
|
|
|
+ for (int i = 0; i < topologyPaths.length; i++) {
|
|
|
|
+ this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Accept a path only if any one of filters given in the
|
|
|
|
+ * constructor do.
|
|
|
|
+ */
|
|
|
|
+ private static class MultiPathFilter implements PathFilter {
|
|
|
|
+ private List<PathFilter> filters;
|
|
|
|
+
|
|
|
|
+ public MultiPathFilter() {
|
|
|
|
+ this.filters = new ArrayList<PathFilter>();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public MultiPathFilter(List<PathFilter> filters) {
|
|
|
|
+ this.filters = filters;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void add(PathFilter one) {
|
|
|
|
+ filters.add(one);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public boolean accept(Path path) {
|
|
|
|
+ for (PathFilter filter : filters) {
|
|
|
|
+ if (filter.accept(path)) {
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String toString() {
|
|
|
|
+ StringBuffer buf = new StringBuffer();
|
|
|
|
+ buf.append("[");
|
|
|
|
+ for (PathFilter f: filters) {
|
|
|
|
+ buf.append(f);
|
|
|
|
+ buf.append(",");
|
|
|
|
+ }
|
|
|
|
+ buf.append("]");
|
|
|
|
+ return buf.toString();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|