|
@@ -50,7 +50,7 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
*/
|
|
|
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
|
|
|
- public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
|
|
|
+ private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
|
|
|
|
|
|
private static final double SPLIT_SLOP = 1.1; // 10% slop
|
|
|
|
|
@@ -128,8 +128,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
job.getConfiguration().setLong("mapred.min.split.size", size);
|
|
|
}
|
|
|
|
|
|
- public static long getMinSplitSize(Configuration conf) {
|
|
|
- return conf.getLong("mapred.min.split.size", 1L);
|
|
|
+ /**
|
|
|
+ * Get the minimum split size
|
|
|
+ * @param job the job
|
|
|
+ * @return the minimum number of bytes that can be in a split
|
|
|
+ */
|
|
|
+ public static long getMinSplitSize(JobContext job) {
|
|
|
+ return job.getConfiguration().getLong("mapred.min.split.size", 1L);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -142,8 +147,14 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
job.getConfiguration().setLong("mapred.max.split.size", size);
|
|
|
}
|
|
|
|
|
|
- public static long getMaxSplitSize(Configuration conf) {
|
|
|
- return conf.getLong("mapred.max.split.size", Long.MAX_VALUE);
|
|
|
+ /**
|
|
|
+ * Get the maximum split size.
|
|
|
+ * @param context the job to look at.
|
|
|
+ * @return the maximum number of bytes a split can include
|
|
|
+ */
|
|
|
+ public static long getMaxSplitSize(JobContext context) {
|
|
|
+ return context.getConfiguration().getLong("mapred.max.split.size",
|
|
|
+ Long.MAX_VALUE);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -151,7 +162,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
*
|
|
|
* @return the PathFilter instance set for the job, NULL if none has been set.
|
|
|
*/
|
|
|
- public static PathFilter getInputPathFilter(Configuration conf) {
|
|
|
+ public static PathFilter getInputPathFilter(JobContext context) {
|
|
|
+ Configuration conf = context.getConfiguration();
|
|
|
Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
|
|
|
PathFilter.class);
|
|
|
return (filterClass != null) ?
|
|
@@ -166,8 +178,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
* @return array of FileStatus objects
|
|
|
* @throws IOException if zero items.
|
|
|
*/
|
|
|
- protected List<FileStatus> listStatus(Configuration job
|
|
|
- ) throws IOException {
|
|
|
+ protected List<FileStatus> listStatus(JobContext job
|
|
|
+ ) throws IOException {
|
|
|
List<FileStatus> result = new ArrayList<FileStatus>();
|
|
|
Path[] dirs = getInputPaths(job);
|
|
|
if (dirs.length == 0) {
|
|
@@ -188,7 +200,7 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
|
|
|
for (int i=0; i < dirs.length; ++i) {
|
|
|
Path p = dirs[i];
|
|
|
- FileSystem fs = p.getFileSystem(job);
|
|
|
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
|
|
|
FileStatus[] matches = fs.globStatus(p, inputFilter);
|
|
|
if (matches == null) {
|
|
|
errors.add(new IOException("Input path does not exist: " + p));
|
|
@@ -216,11 +228,11 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
}
|
|
|
|
|
|
|
|
|
- /** Splits files returned by {@link #listStatus(Configuration)} when
|
|
|
- * they're too big.*/
|
|
|
- public List<InputSplit> getSplits(JobContext context
|
|
|
+ /**
|
|
|
+ * Generate the list of files and make them into FileSplits.
|
|
|
+ */
|
|
|
+ public List<InputSplit> getSplits(JobContext job
|
|
|
) throws IOException {
|
|
|
- Configuration job = context.getConfiguration();
|
|
|
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
|
|
|
long maxSize = getMaxSplitSize(job);
|
|
|
|
|
@@ -228,10 +240,10 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
List<InputSplit> splits = new ArrayList<InputSplit>();
|
|
|
for (FileStatus file: listStatus(job)) {
|
|
|
Path path = file.getPath();
|
|
|
- FileSystem fs = path.getFileSystem(context.getConfiguration());
|
|
|
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
|
|
|
long length = file.getLen();
|
|
|
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
|
|
|
- if ((length != 0) && isSplitable(context, path)) {
|
|
|
+ if ((length != 0) && isSplitable(job, path)) {
|
|
|
long blockSize = file.getBlockSize();
|
|
|
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
|
|
|
|
|
@@ -391,11 +403,11 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
|
|
|
/**
|
|
|
* Get the list of input {@link Path}s for the map-reduce job.
|
|
|
*
|
|
|
- * @param conf The configuration of the job
|
|
|
+ * @param context The job
|
|
|
* @return the list of input {@link Path}s for the map-reduce job.
|
|
|
*/
|
|
|
- public static Path[] getInputPaths(Configuration conf) {
|
|
|
- String dirs = conf.get("mapred.input.dir", "");
|
|
|
+ public static Path[] getInputPaths(JobContext context) {
|
|
|
+ String dirs = context.getConfiguration().get("mapred.input.dir", "");
|
|
|
String [] list = StringUtils.split(dirs);
|
|
|
Path[] result = new Path[list.length];
|
|
|
for (int i = 0; i < list.length; i++) {
|