|
@@ -57,7 +57,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
|
= "mapred.num.splits";
|
|
= "mapred.num.splits";
|
|
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
|
|
private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK
|
|
= "mapred.num.entries.per.chunk";
|
|
= "mapred.num.entries.per.chunk";
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Implementation of InputFormat::getSplits(). This method splits up the
|
|
* Implementation of InputFormat::getSplits(). This method splits up the
|
|
* copy-listing file into chunks, and assigns the first batch to different
|
|
* copy-listing file into chunks, and assigns the first batch to different
|
|
@@ -91,7 +91,7 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
|
// Setting non-zero length for FileSplit size, to avoid a possible
|
|
// Setting non-zero length for FileSplit size, to avoid a possible
|
|
// future when 0-sized file-splits are considered "empty" and skipped
|
|
// future when 0-sized file-splits are considered "empty" and skipped
|
|
// over.
|
|
// over.
|
|
- MIN_RECORDS_PER_CHUNK,
|
|
|
|
|
|
+ getMinRecordsPerChunk(jobContext.getConfiguration()),
|
|
null));
|
|
null));
|
|
}
|
|
}
|
|
DistCpUtils.publish(jobContext.getConfiguration(),
|
|
DistCpUtils.publish(jobContext.getConfiguration(),
|
|
@@ -107,9 +107,11 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
|
final Configuration configuration = context.getConfiguration();
|
|
final Configuration configuration = context.getConfiguration();
|
|
int numRecords = getNumberOfRecords(configuration);
|
|
int numRecords = getNumberOfRecords(configuration);
|
|
int numMaps = getNumMapTasks(configuration);
|
|
int numMaps = getNumMapTasks(configuration);
|
|
|
|
+ int maxChunksTolerable = getMaxChunksTolerable(configuration);
|
|
|
|
+
|
|
// Number of chunks each map will process, on average.
|
|
// Number of chunks each map will process, on average.
|
|
int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
|
|
int splitRatio = getListingSplitRatio(configuration, numMaps, numRecords);
|
|
- validateNumChunksUsing(splitRatio, numMaps);
|
|
|
|
|
|
+ validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable);
|
|
|
|
|
|
int numEntriesPerChunk = (int)Math.ceil((float)numRecords
|
|
int numEntriesPerChunk = (int)Math.ceil((float)numRecords
|
|
/(splitRatio * numMaps));
|
|
/(splitRatio * numMaps));
|
|
@@ -168,9 +170,9 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
|
return chunksFinal;
|
|
return chunksFinal;
|
|
}
|
|
}
|
|
|
|
|
|
- private static void validateNumChunksUsing(int splitRatio, int numMaps)
|
|
|
|
- throws IOException {
|
|
|
|
- if (splitRatio * numMaps > MAX_CHUNKS_TOLERABLE)
|
|
|
|
|
|
+ private static void validateNumChunksUsing(int splitRatio, int numMaps,
|
|
|
|
+ int maxChunksTolerable) throws IOException {
|
|
|
|
+ if (splitRatio * numMaps > maxChunksTolerable)
|
|
throw new IOException("Too many chunks created with splitRatio:"
|
|
throw new IOException("Too many chunks created with splitRatio:"
|
|
+ splitRatio + ", numMaps:" + numMaps
|
|
+ splitRatio + ", numMaps:" + numMaps
|
|
+ ". Reduce numMaps or decrease split-ratio to proceed.");
|
|
+ ". Reduce numMaps or decrease split-ratio to proceed.");
|
|
@@ -238,14 +240,61 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
|
int numMaps, int numPaths) {
|
|
int numMaps, int numPaths) {
|
|
return configuration.getInt(
|
|
return configuration.getInt(
|
|
CONF_LABEL_LISTING_SPLIT_RATIO,
|
|
CONF_LABEL_LISTING_SPLIT_RATIO,
|
|
- getSplitRatio(numMaps, numPaths));
|
|
|
|
|
|
+ getSplitRatio(numMaps, numPaths, configuration));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static int getMaxChunksTolerable(Configuration conf) {
|
|
|
|
+ int maxChunksTolerable = conf.getInt(
|
|
|
|
+ DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE,
|
|
|
|
+ DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
|
|
|
|
+ if (maxChunksTolerable <= 0) {
|
|
|
|
+ LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_TOLERABLE +
|
|
|
|
+ " should be positive. Fall back to default value: "
|
|
|
|
+ + DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT);
|
|
|
|
+ maxChunksTolerable = DistCpConstants.MAX_CHUNKS_TOLERABLE_DEFAULT;
|
|
|
|
+ }
|
|
|
|
+ return maxChunksTolerable;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static int getMaxChunksIdeal(Configuration conf) {
|
|
|
|
+ int maxChunksIdeal = conf.getInt(
|
|
|
|
+ DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL,
|
|
|
|
+ DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
|
|
|
|
+ if (maxChunksIdeal <= 0) {
|
|
|
|
+ LOG.warn(DistCpConstants.CONF_LABEL_MAX_CHUNKS_IDEAL +
|
|
|
|
+ " should be positive. Fall back to default value: "
|
|
|
|
+ + DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT);
|
|
|
|
+ maxChunksIdeal = DistCpConstants.MAX_CHUNKS_IDEAL_DEFAULT;
|
|
|
|
+ }
|
|
|
|
+ return maxChunksIdeal;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static int getMinRecordsPerChunk(Configuration conf) {
|
|
|
|
+ int minRecordsPerChunk = conf.getInt(
|
|
|
|
+ DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK,
|
|
|
|
+ DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
|
|
|
|
+ if (minRecordsPerChunk <= 0) {
|
|
|
|
+ LOG.warn(DistCpConstants.CONF_LABEL_MIN_RECORDS_PER_CHUNK +
|
|
|
|
+ " should be positive. Fall back to default value: "
|
|
|
|
+ + DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT);
|
|
|
|
+ minRecordsPerChunk = DistCpConstants.MIN_RECORDS_PER_CHUNK_DEFAULT;
|
|
|
|
+ }
|
|
|
|
+ return minRecordsPerChunk;
|
|
}
|
|
}
|
|
|
|
|
|
- private static final int MAX_CHUNKS_TOLERABLE = 400;
|
|
|
|
- private static final int MAX_CHUNKS_IDEAL = 100;
|
|
|
|
- private static final int MIN_RECORDS_PER_CHUNK = 5;
|
|
|
|
- private static final int SPLIT_RATIO_DEFAULT = 2;
|
|
|
|
-
|
|
|
|
|
|
+ private static int getSplitRatio(Configuration conf) {
|
|
|
|
+ int splitRatio = conf.getInt(
|
|
|
|
+ DistCpConstants.CONF_LABEL_SPLIT_RATIO,
|
|
|
|
+ DistCpConstants.SPLIT_RATIO_DEFAULT);
|
|
|
|
+ if (splitRatio <= 0) {
|
|
|
|
+ LOG.warn(DistCpConstants.CONF_LABEL_SPLIT_RATIO +
|
|
|
|
+ " should be positive. Fall back to default value: "
|
|
|
|
+ + DistCpConstants.SPLIT_RATIO_DEFAULT);
|
|
|
|
+ splitRatio = DistCpConstants.SPLIT_RATIO_DEFAULT;
|
|
|
|
+ }
|
|
|
|
+ return splitRatio;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Package private, for testability.
|
|
* Package private, for testability.
|
|
* @param nMaps The number of maps requested for.
|
|
* @param nMaps The number of maps requested for.
|
|
@@ -253,19 +302,34 @@ public class DynamicInputFormat<K, V> extends InputFormat<K, V> {
|
|
* @return The number of splits each map should handle, ideally.
|
|
* @return The number of splits each map should handle, ideally.
|
|
*/
|
|
*/
|
|
static int getSplitRatio(int nMaps, int nRecords) {
|
|
static int getSplitRatio(int nMaps, int nRecords) {
|
|
|
|
+ return getSplitRatio(nMaps, nRecords,new Configuration());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Package private, for testability.
|
|
|
|
+ * @param nMaps The number of maps requested for.
|
|
|
|
+ * @param nRecords The number of records to be copied.
|
|
|
|
+ * @param conf The configuration set by users.
|
|
|
|
+ * @return The number of splits each map should handle, ideally.
|
|
|
|
+ */
|
|
|
|
+ static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
|
|
|
|
+ int maxChunksIdeal = getMaxChunksIdeal(conf);
|
|
|
|
+ int minRecordsPerChunk = getMinRecordsPerChunk(conf);
|
|
|
|
+ int splitRatio = getSplitRatio(conf);
|
|
|
|
+
|
|
if (nMaps == 1) {
|
|
if (nMaps == 1) {
|
|
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
|
|
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
|
|
return 1;
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
|
|
- if (nMaps > MAX_CHUNKS_IDEAL)
|
|
|
|
- return SPLIT_RATIO_DEFAULT;
|
|
|
|
|
|
+ if (nMaps > maxChunksIdeal)
|
|
|
|
+ return splitRatio;
|
|
|
|
|
|
- int nPickups = (int)Math.ceil((float)MAX_CHUNKS_IDEAL/nMaps);
|
|
|
|
|
|
+ int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
|
|
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
|
|
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));
|
|
|
|
|
|
- return nRecordsPerChunk < MIN_RECORDS_PER_CHUNK ?
|
|
|
|
- SPLIT_RATIO_DEFAULT : nPickups;
|
|
|
|
|
|
+ return nRecordsPerChunk < minRecordsPerChunk ?
|
|
|
|
+ splitRatio : nPickups;
|
|
}
|
|
}
|
|
|
|
|
|
static int getNumEntriesPerChunk(Configuration configuration) {
|
|
static int getNumEntriesPerChunk(Configuration configuration) {
|