|
@@ -209,7 +209,7 @@ public class TestMergeManager {
|
|
assertEquals(100, jobConf.getInt(MRJobConfig.IO_SORT_MB, 10));
|
|
assertEquals(100, jobConf.getInt(MRJobConfig.IO_SORT_MB, 10));
|
|
}
|
|
}
|
|
|
|
|
|
- @SuppressWarnings({ "unchecked", "deprecation" })
|
|
|
|
|
|
+ @SuppressWarnings({ "unchecked" })
|
|
@Test
|
|
@Test
|
|
@Timeout(value = 10)
|
|
@Timeout(value = 10)
|
|
public void testOnDiskMerger() throws IOException {
|
|
public void testOnDiskMerger() throws IOException {
|
|
@@ -229,29 +229,29 @@ public class TestMergeManager {
|
|
// make sure the io.sort.factor is set properly
|
|
// make sure the io.sort.factor is set properly
|
|
assertEquals(mergeFactor, SORT_FACTOR);
|
|
assertEquals(mergeFactor, SORT_FACTOR);
|
|
|
|
|
|
- // Stop the onDiskMerger thread so that we can intercept the list of files
|
|
|
|
- // waiting to be merged.
|
|
|
|
- onDiskMerger.suspend();
|
|
|
|
-
|
|
|
|
- //Send the list of fake files waiting to be merged
|
|
|
|
- Random rand = new Random();
|
|
|
|
- for(int i = 0; i < 2*SORT_FACTOR; ++i) {
|
|
|
|
- Path path = new Path("somePath");
|
|
|
|
- CompressAwarePath cap = new CompressAwarePath(path, 1L, rand.nextInt());
|
|
|
|
- manager.closeOnDiskFile(cap);
|
|
|
|
- }
|
|
|
|
|
|
+ // Suspend the onDiskMerger thread main loop so that we can intercept the
|
|
|
|
+ // list of files waiting to be merged.
|
|
|
|
+ synchronized (onDiskMerger.getPendingToBeMerged()) {
|
|
|
|
+ //Send the list of fake files waiting to be merged
|
|
|
|
+ Random rand = new Random();
|
|
|
|
+ for(int i = 0; i < 2*SORT_FACTOR; ++i) {
|
|
|
|
+ Path path = new Path("somePath");
|
|
|
|
+ CompressAwarePath cap = new CompressAwarePath(path, 1L, rand.nextInt());
|
|
|
|
+ manager.closeOnDiskFile(cap);
|
|
|
|
+ }
|
|
|
|
|
|
- //Check that the files pending to be merged are in sorted order.
|
|
|
|
- LinkedList<List<CompressAwarePath>> pendingToBeMerged = onDiskMerger.getPendingToBeMerged();
|
|
|
|
- assertTrue(pendingToBeMerged.size() > 0,
|
|
|
|
- "No inputs were added to list pending to merge");
|
|
|
|
- for(int i = 0; i < pendingToBeMerged.size(); ++i) {
|
|
|
|
- List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
|
|
|
|
- for(int j = 1; j < inputs.size(); ++j) {
|
|
|
|
- assertTrue(inputs.size() > 0 && inputs.size() <= SORT_FACTOR,
|
|
|
|
- "Not enough / too many inputs were going to be merged");
|
|
|
|
- assertTrue(inputs.get(j).getCompressedSize() >= inputs.get(j - 1).getCompressedSize(),
|
|
|
|
- "Inputs to be merged were not sorted according to size: ");
|
|
|
|
|
|
+ //Check that the files pending to be merged are in sorted order.
|
|
|
|
+ LinkedList<List<CompressAwarePath>> pendingToBeMerged = onDiskMerger.getPendingToBeMerged();
|
|
|
|
+ assertTrue(pendingToBeMerged.size() > 0,
|
|
|
|
+ "No inputs were added to list pending to merge");
|
|
|
|
+ for(int i = 0; i < pendingToBeMerged.size(); ++i) {
|
|
|
|
+ List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
|
|
|
|
+ for(int j = 1; j < inputs.size(); ++j) {
|
|
|
|
+ assertTrue(inputs.size() > 0 && inputs.size() <= SORT_FACTOR,
|
|
|
|
+ "Not enough / too many inputs were going to be merged");
|
|
|
|
+ assertTrue(inputs.get(j).getCompressedSize() >= inputs.get(j - 1).getCompressedSize(),
|
|
|
|
+ "Inputs to be merged were not sorted according to size: ");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|