|
@@ -24,13 +24,19 @@ import java.util.List;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.PathFilter;
|
|
|
+import org.apache.hadoop.io.compress.CompressionCodec;
|
|
|
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
|
|
|
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
|
|
|
import org.apache.hadoop.mapred.InputFormat;
|
|
|
import org.apache.hadoop.mapred.InputSplit;
|
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
|
import org.apache.hadoop.mapred.RecordReader;
|
|
|
import org.apache.hadoop.mapred.Reporter;
|
|
|
import org.apache.hadoop.mapreduce.Job;
|
|
|
+import org.apache.hadoop.mapreduce.JobContext;
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
|
|
|
|
|
/**
|
|
@@ -127,4 +133,34 @@ public abstract class CombineFileInputFormat<K, V>
|
|
|
return result.toArray(new FileStatus[result.size()]);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Subclasses should avoid overriding this method and should instead only
|
|
|
+ * override {@link #isSplitable(FileSystem, Path)}. The implementation of
|
|
|
+ * this method simply calls the other method to preserve compatibility.
|
|
|
+ * @see <a href="https://issues.apache.org/jira/browse/MAPREDUCE-5530">
|
|
|
+ * MAPREDUCE-5530</a>
|
|
|
+ *
|
|
|
+ * @param context the job context
|
|
|
+ * @param file the file name to check
|
|
|
+ * @return is this file splitable?
|
|
|
+ */
|
|
|
+ @InterfaceAudience.Private
|
|
|
+ @Override
|
|
|
+ protected boolean isSplitable(JobContext context, Path file) {
|
|
|
+ try {
|
|
|
+ return isSplitable(FileSystem.get(context.getConfiguration()), file);
|
|
|
+ }
|
|
|
+ catch (IOException ioe) {
|
|
|
+ throw new RuntimeException(ioe);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected boolean isSplitable(FileSystem fs, Path file) {
|
|
|
+ final CompressionCodec codec =
|
|
|
+ new CompressionCodecFactory(fs.getConf()).getCodec(file);
|
|
|
+ if (null == codec) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return codec instanceof SplittableCompressionCodec;
|
|
|
+ }
|
|
|
}
|