|
@@ -0,0 +1,393 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.mapreduce.lib.input;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
+import org.apache.hadoop.fs.PathFilter;
|
|
|
+import org.apache.hadoop.fs.BlockLocation;
|
|
|
+import org.apache.hadoop.mapreduce.InputFormat;
|
|
|
+import org.apache.hadoop.mapreduce.InputSplit;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
+import org.apache.hadoop.mapreduce.Mapper;
|
|
|
+import org.apache.hadoop.util.ReflectionUtils;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
+
|
|
|
+/**
|
|
|
+ * A base class for file-based {@link InputFormat}s.
|
|
|
+ *
|
|
|
+ * <p><code>FileInputFormat</code> is the base class for all file-based
|
|
|
+ * <code>InputFormat</code>s. This provides a generic implementation of
|
|
|
+ * {@link #getSplits(JobContext)}.
|
|
|
+ * Subclasses of <code>FileInputFormat</code> can also override the
|
|
|
+ * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
|
|
|
+ * not split-up and are processed as a whole by {@link Mapper}s.
|
|
|
+ */
|
|
|
+public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
+
|
|
|
+ public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
|
|
|
+
|
|
|
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
|
|
|
+
|
|
|
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
|
|
|
+ public boolean accept(Path p){
|
|
|
+ String name = p.getName();
|
|
|
+ return !name.startsWith("_") && !name.startsWith(".");
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Proxy PathFilter that accepts a path only if all filters given in the
|
|
|
+ * constructor do. Used by the listPaths() to apply the built-in
|
|
|
+ * hiddenFileFilter together with a user provided one (if any).
|
|
|
+ */
|
|
|
+ private static class MultiPathFilter implements PathFilter {
|
|
|
+ private List<PathFilter> filters;
|
|
|
+
|
|
|
+ public MultiPathFilter(List<PathFilter> filters) {
|
|
|
+ this.filters = filters;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean accept(Path path) {
|
|
|
+ for (PathFilter filter : filters) {
|
|
|
+ if (!filter.accept(path)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the lower bound on split size imposed by the format.
|
|
|
+ * @return the number of bytes of the minimal split for this format
|
|
|
+ */
|
|
|
+ protected long getFormatMinSplitSize() {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is the given filename splitable? Usually, true, but if the file is
|
|
|
+ * stream compressed, it will not be.
|
|
|
+ *
|
|
|
+ * <code>FileInputFormat</code> implementations can override this and return
|
|
|
+ * <code>false</code> to ensure that individual input files are never split-up
|
|
|
+ * so that {@link Mapper}s process entire files.
|
|
|
+ *
|
|
|
+ * @param context the job context
|
|
|
+ * @param filename the file name to check
|
|
|
+ * @return is this file splitable?
|
|
|
+ */
|
|
|
+ protected boolean isSplitable(JobContext context, Path filename) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set a PathFilter to be applied to the input paths for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param filter the PathFilter class use for filtering the input paths.
|
|
|
+ */
|
|
|
+ public static void setInputPathFilter(Configuration conf,
|
|
|
+ Class<? extends PathFilter> filter) {
|
|
|
+ conf.setClass("mapred.input.pathFilter.class", filter, PathFilter.class);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void setMinInputSplitSize(Configuration conf,
|
|
|
+ long size) {
|
|
|
+ conf.setLong("mapred.min.split.size", size);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static long getMinSplitSize(Configuration conf) {
|
|
|
+ return conf.getLong("mapred.min.split.size", 1L);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void setMaxInputSplitSize(Configuration conf,
|
|
|
+ long size) {
|
|
|
+ conf.setLong("mapred.max.split.size", size);
|
|
|
+ }
|
|
|
+
|
|
|
+ public static long getMaxSplitSize(Configuration conf) {
|
|
|
+ return conf.getLong("mapred.max.split.size", Long.MAX_VALUE);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get a PathFilter instance of the filter set for the input paths.
|
|
|
+ *
|
|
|
+ * @return the PathFilter instance set for the job, NULL if none has been set.
|
|
|
+ */
|
|
|
+ public static PathFilter getInputPathFilter(Configuration conf) {
|
|
|
+ Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
|
|
|
+ PathFilter.class);
|
|
|
+ return (filterClass != null) ?
|
|
|
+ (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** List input directories.
|
|
|
+ * Subclasses may override to, e.g., select only files matching a regular
|
|
|
+ * expression.
|
|
|
+ *
|
|
|
+ * @param job the job to list input paths for
|
|
|
+ * @return array of FileStatus objects
|
|
|
+ * @throws IOException if zero items.
|
|
|
+ */
|
|
|
+ protected List<FileStatus> listStatus(Configuration job
|
|
|
+ ) throws IOException {
|
|
|
+ List<FileStatus> result = new ArrayList<FileStatus>();
|
|
|
+ Path[] dirs = getInputPaths(job);
|
|
|
+ if (dirs.length == 0) {
|
|
|
+ throw new IOException("No input paths specified in job");
|
|
|
+ }
|
|
|
+
|
|
|
+ List<IOException> errors = new ArrayList<IOException>();
|
|
|
+
|
|
|
+ // creates a MultiPathFilter with the hiddenFileFilter and the
|
|
|
+ // user provided one (if any).
|
|
|
+ List<PathFilter> filters = new ArrayList<PathFilter>();
|
|
|
+ filters.add(hiddenFileFilter);
|
|
|
+ PathFilter jobFilter = getInputPathFilter(job);
|
|
|
+ if (jobFilter != null) {
|
|
|
+ filters.add(jobFilter);
|
|
|
+ }
|
|
|
+ PathFilter inputFilter = new MultiPathFilter(filters);
|
|
|
+
|
|
|
+ for (int i=0; i < dirs.length; ++i) {
|
|
|
+ Path p = dirs[i];
|
|
|
+ FileSystem fs = p.getFileSystem(job);
|
|
|
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
|
|
|
+ if (matches == null) {
|
|
|
+ errors.add(new IOException("Input path does not exist: " + p));
|
|
|
+ } else if (matches.length == 0) {
|
|
|
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
|
|
|
+ } else {
|
|
|
+ for (FileStatus globStat: matches) {
|
|
|
+ if (globStat.isDir()) {
|
|
|
+ for(FileStatus stat: fs.listStatus(globStat.getPath(),
|
|
|
+ inputFilter)) {
|
|
|
+ result.add(stat);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ result.add(globStat);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!errors.isEmpty()) {
|
|
|
+ throw new InvalidInputException(errors);
|
|
|
+ }
|
|
|
+ LOG.info("Total input paths to process : " + result.size());
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /** Splits files returned by {@link #listStatus(Configuration)} when
|
|
|
+ * they're too big.*/
|
|
|
+ public List<InputSplit> getSplits(JobContext context
|
|
|
+ ) throws IOException {
|
|
|
+ Configuration job = context.getConfiguration();
|
|
|
+ long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
|
|
|
+ long maxSize = getMaxSplitSize(job);
|
|
|
+
|
|
|
+ // generate splits
|
|
|
+ List<InputSplit> splits = new ArrayList<InputSplit>();
|
|
|
+ for (FileStatus file: listStatus(job)) {
|
|
|
+ Path path = file.getPath();
|
|
|
+ FileSystem fs = path.getFileSystem(context.getConfiguration());
|
|
|
+ long length = file.getLen();
|
|
|
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
|
|
|
+ if ((length != 0) && isSplitable(context, path)) {
|
|
|
+ long blockSize = file.getBlockSize();
|
|
|
+ long splitSize = computeSplitSize(blockSize, minSize, maxSize);
|
|
|
+
|
|
|
+ long bytesRemaining = length;
|
|
|
+ while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
|
|
|
+ int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
|
|
|
+ splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
|
|
|
+ blkLocations[blkIndex].getHosts()));
|
|
|
+ bytesRemaining -= splitSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (bytesRemaining != 0) {
|
|
|
+ splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
|
|
|
+ blkLocations[blkLocations.length-1].getHosts()));
|
|
|
+ }
|
|
|
+ } else if (length != 0) {
|
|
|
+ splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
|
|
|
+ } else {
|
|
|
+ //Create empty hosts array for zero length files
|
|
|
+ splits.add(new FileSplit(path, 0, length, new String[0]));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.debug("Total # of splits: " + splits.size());
|
|
|
+ return splits;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected long computeSplitSize(long blockSize, long minSize,
|
|
|
+ long maxSize) {
|
|
|
+ return Math.max(minSize, Math.min(maxSize, blockSize));
|
|
|
+ }
|
|
|
+
|
|
|
+ protected int getBlockIndex(BlockLocation[] blkLocations,
|
|
|
+ long offset) {
|
|
|
+ for (int i = 0 ; i < blkLocations.length; i++) {
|
|
|
+ // is the offset inside this block?
|
|
|
+ if ((blkLocations[i].getOffset() <= offset) &&
|
|
|
+ (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
|
|
|
+ return i;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ BlockLocation last = blkLocations[blkLocations.length -1];
|
|
|
+ long fileLength = last.getOffset() + last.getLength() -1;
|
|
|
+ throw new IllegalArgumentException("Offset " + offset +
|
|
|
+ " is outside of file (0.." +
|
|
|
+ fileLength + ")");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Sets the given comma separated paths as the list of inputs
|
|
|
+ * for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf Configuration of the job
|
|
|
+ * @param commaSeparatedPaths Comma separated paths to be set as
|
|
|
+ * the list of inputs for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void setInputPaths(Configuration conf,
|
|
|
+ String commaSeparatedPaths
|
|
|
+ ) throws IOException {
|
|
|
+ setInputPaths(conf, StringUtils.stringToPath(
|
|
|
+ getPathStrings(commaSeparatedPaths)));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add the given comma separated paths to the list of inputs for
|
|
|
+ * the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf The configuration of the job
|
|
|
+ * @param commaSeparatedPaths Comma separated paths to be added to
|
|
|
+ * the list of inputs for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void addInputPaths(Configuration conf,
|
|
|
+ String commaSeparatedPaths
|
|
|
+ ) throws IOException {
|
|
|
+ for (String str : getPathStrings(commaSeparatedPaths)) {
|
|
|
+ addInputPath(conf, new Path(str));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set the array of {@link Path}s as the list of inputs
|
|
|
+ * for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf Configuration of the job.
|
|
|
+ * @param inputPaths the {@link Path}s of the input directories/files
|
|
|
+ * for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void setInputPaths(Configuration conf,
|
|
|
+ Path... inputPaths) throws IOException {
|
|
|
+ FileSystem fs = FileSystem.get(conf);
|
|
|
+ Path path = inputPaths[0].makeQualified(fs);
|
|
|
+ StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
|
|
|
+ for(int i = 1; i < inputPaths.length;i++) {
|
|
|
+ str.append(StringUtils.COMMA_STR);
|
|
|
+ path = inputPaths[i].makeQualified(fs);
|
|
|
+ str.append(StringUtils.escapeString(path.toString()));
|
|
|
+ }
|
|
|
+ conf.set("mapred.input.dir", str.toString());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add a {@link Path} to the list of inputs for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf The configuration of the job
|
|
|
+ * @param path {@link Path} to be added to the list of inputs for
|
|
|
+ * the map-reduce job.
|
|
|
+ */
|
|
|
+ public static void addInputPath(Configuration conf,
|
|
|
+ Path path) throws IOException {
|
|
|
+ FileSystem fs = FileSystem.get(conf);
|
|
|
+ path = path.makeQualified(fs);
|
|
|
+ String dirStr = StringUtils.escapeString(path.toString());
|
|
|
+ String dirs = conf.get("mapred.input.dir");
|
|
|
+ conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ // This method escapes commas in the glob pattern of the given paths.
|
|
|
+ private static String[] getPathStrings(String commaSeparatedPaths) {
|
|
|
+ int length = commaSeparatedPaths.length();
|
|
|
+ int curlyOpen = 0;
|
|
|
+ int pathStart = 0;
|
|
|
+ boolean globPattern = false;
|
|
|
+ List<String> pathStrings = new ArrayList<String>();
|
|
|
+
|
|
|
+ for (int i=0; i<length; i++) {
|
|
|
+ char ch = commaSeparatedPaths.charAt(i);
|
|
|
+ switch(ch) {
|
|
|
+ case '{' : {
|
|
|
+ curlyOpen++;
|
|
|
+ if (!globPattern) {
|
|
|
+ globPattern = true;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case '}' : {
|
|
|
+ curlyOpen--;
|
|
|
+ if (curlyOpen == 0 && globPattern) {
|
|
|
+ globPattern = false;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case ',' : {
|
|
|
+ if (!globPattern) {
|
|
|
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
|
|
|
+ pathStart = i + 1 ;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
|
|
|
+
|
|
|
+ return pathStrings.toArray(new String[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the list of input {@link Path}s for the map-reduce job.
|
|
|
+ *
|
|
|
+ * @param conf The configuration of the job
|
|
|
+ * @return the list of input {@link Path}s for the map-reduce job.
|
|
|
+ */
|
|
|
+ public static Path[] getInputPaths(Configuration conf) {
|
|
|
+ String dirs = conf.get("mapred.input.dir", "");
|
|
|
+ String [] list = StringUtils.split(dirs);
|
|
|
+ Path[] result = new Path[list.length];
|
|
|
+ for (int i = 0; i < list.length; i++) {
|
|
|
+ result[i] = new Path(StringUtils.unEscapeString(list[i]));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|