|
@@ -99,7 +99,9 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|
|
|
|
|
private long usedMemory;
|
|
private long usedMemory;
|
|
private long commitMemory;
|
|
private long commitMemory;
|
|
- private final long maxSingleShuffleLimit;
|
|
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ final long maxSingleShuffleLimit;
|
|
|
|
|
|
private final int memToMemMergeOutputsThreshold;
|
|
private final int memToMemMergeOutputsThreshold;
|
|
private final long mergeThreshold;
|
|
private final long mergeThreshold;
|
|
@@ -187,10 +189,16 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|
|
|
|
|
usedMemory = 0L;
|
|
usedMemory = 0L;
|
|
commitMemory = 0L;
|
|
commitMemory = 0L;
|
|
- this.maxSingleShuffleLimit =
|
|
|
|
- (long)(memoryLimit * singleShuffleMemoryLimitPercent);
|
|
|
|
- this.memToMemMergeOutputsThreshold =
|
|
|
|
- jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
|
|
|
|
|
|
+ long maxSingleShuffleLimitConfiged =
|
|
|
|
+ (long)(memoryLimit * singleShuffleMemoryLimitPercent);
|
|
|
|
+ if(maxSingleShuffleLimitConfiged > Integer.MAX_VALUE) {
|
|
|
|
+ maxSingleShuffleLimitConfiged = Integer.MAX_VALUE;
|
|
|
|
+ LOG.info("The max number of bytes for a single in-memory shuffle cannot" +
|
|
|
|
+ " be larger than Integer.MAX_VALUE. Setting it to Integer.MAX_VALUE");
|
|
|
|
+ }
|
|
|
|
+ this.maxSingleShuffleLimit = maxSingleShuffleLimitConfiged;
|
|
|
|
+ this.memToMemMergeOutputsThreshold =
|
|
|
|
+ jobConf.getInt(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, ioSortFactor);
|
|
this.mergeThreshold = (long)(this.memoryLimit *
|
|
this.mergeThreshold = (long)(this.memoryLimit *
|
|
jobConf.getFloat(
|
|
jobConf.getFloat(
|
|
MRJobConfig.SHUFFLE_MERGE_PERCENT,
|
|
MRJobConfig.SHUFFLE_MERGE_PERCENT,
|
|
@@ -249,17 +257,13 @@ public class MergeManagerImpl<K, V> implements MergeManager<K, V> {
|
|
public void waitForResource() throws InterruptedException {
|
|
public void waitForResource() throws InterruptedException {
|
|
inMemoryMerger.waitForMerge();
|
|
inMemoryMerger.waitForMerge();
|
|
}
|
|
}
|
|
-
|
|
|
|
- private boolean canShuffleToMemory(long requestedSize) {
|
|
|
|
- return (requestedSize < maxSingleShuffleLimit);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
|
|
public synchronized MapOutput<K,V> reserve(TaskAttemptID mapId,
|
|
long requestedSize,
|
|
long requestedSize,
|
|
int fetcher
|
|
int fetcher
|
|
) throws IOException {
|
|
) throws IOException {
|
|
- if (!canShuffleToMemory(requestedSize)) {
|
|
|
|
|
|
+ if (requestedSize > maxSingleShuffleLimit) {
|
|
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
|
|
LOG.info(mapId + ": Shuffling to disk since " + requestedSize +
|
|
" is greater than maxSingleShuffleLimit (" +
|
|
" is greater than maxSingleShuffleLimit (" +
|
|
maxSingleShuffleLimit + ")");
|
|
maxSingleShuffleLimit + ")");
|