|
@@ -35,16 +35,23 @@ import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
public class TestIndexCache extends TestCase {
|
|
|
+ private JobConf conf;
|
|
|
+ private FileSystem fs;
|
|
|
+ private Path p;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void setUp() throws IOException {
|
|
|
+ conf = new JobConf();
|
|
|
+ fs = FileSystem.getLocal(conf).getRaw();
|
|
|
+ p = new Path(System.getProperty("test.build.data", "/tmp"),
|
|
|
+ "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
|
|
+ }
|
|
|
|
|
|
public void testLRCPolicy() throws Exception {
|
|
|
Random r = new Random();
|
|
|
long seed = r.nextLong();
|
|
|
r.setSeed(seed);
|
|
|
System.out.println("seed: " + seed);
|
|
|
- JobConf conf = new JobConf();
|
|
|
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
|
|
|
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
|
|
|
- "cache").makeQualified(fs);
|
|
|
fs.delete(p, true);
|
|
|
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
|
|
final int partsPerMap = 1000;
|
|
@@ -115,10 +122,6 @@ public class TestIndexCache extends TestCase {
|
|
|
|
|
|
public void testBadIndex() throws Exception {
|
|
|
final int parts = 30;
|
|
|
- JobConf conf = new JobConf();
|
|
|
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
|
|
|
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
|
|
|
- "cache").makeQualified(fs);
|
|
|
fs.delete(p, true);
|
|
|
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
|
|
IndexCache cache = new IndexCache(conf);
|
|
@@ -150,10 +153,6 @@ public class TestIndexCache extends TestCase {
|
|
|
}
|
|
|
|
|
|
public void testInvalidReduceNumberOrLength() throws Exception {
|
|
|
- JobConf conf = new JobConf();
|
|
|
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
|
|
|
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
|
|
|
- "cache").makeQualified(fs);
|
|
|
fs.delete(p, true);
|
|
|
conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
|
|
final int partsPerMap = 1000;
|
|
@@ -199,10 +198,6 @@ public class TestIndexCache extends TestCase {
|
|
|
// This test case may not repeatable. But on my macbook this test
|
|
|
// fails with probability of 100% on code before MAPREDUCE-2541,
|
|
|
// so it is repeatable in practice.
|
|
|
- JobConf conf = new JobConf();
|
|
|
- FileSystem fs = FileSystem.getLocal(conf).getRaw();
|
|
|
- Path p = new Path(System.getProperty("test.build.data", "/tmp"),
|
|
|
- "cache").makeQualified(fs);
|
|
|
fs.delete(p, true);
|
|
|
conf.setInt(TTConfig.TT_INDEX_CACHE, 10);
|
|
|
// Make a big file so removeMapThread almost surely runs faster than
|
|
@@ -247,6 +242,66 @@ public class TestIndexCache extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testCreateRace() throws Exception {
|
|
|
+ fs.delete(p, true);
|
|
|
+ conf.setInt(TTConfig.TT_INDEX_CACHE, 1);
|
|
|
+ final int partsPerMap = 1000;
|
|
|
+ final int bytesPerFile = partsPerMap * 24;
|
|
|
+ final IndexCache cache = new IndexCache(conf);
|
|
|
+
|
|
|
+ final Path racy = new Path(p, "racyIndex");
|
|
|
+ final String user =
|
|
|
+ UserGroupInformation.getCurrentUser().getShortUserName();
|
|
|
+ writeFile(fs, racy, bytesPerFile, partsPerMap);
|
|
|
+
|
|
|
+ // run multiple instances
|
|
|
+ Thread[] getInfoThreads = new Thread[50];
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ getInfoThreads[i] = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
|
|
|
+ cache.removeMap("racyIndex");
|
|
|
+ } catch (Exception e) {
|
|
|
+ // should not be here
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ getInfoThreads[i].start();
|
|
|
+ }
|
|
|
+
|
|
|
+ final Thread mainTestThread = Thread.currentThread();
|
|
|
+
|
|
|
+ Thread timeoutThread = new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ Thread.sleep(15000);
|
|
|
+ mainTestThread.interrupt();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ // we are done;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ for (int i = 0; i < 50; i++) {
|
|
|
+ try {
|
|
|
+ getInfoThreads[i].join();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ // we haven't finished in time. Potential deadlock/race.
|
|
|
+ fail("Unexpectedly long delay during concurrent cache entry creations");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // stop the timeoutThread. If we get interrupted before stopping, there
|
|
|
+ // must be something wrong, although it wasn't a deadlock. No need to
|
|
|
+ // catch and swallow.
|
|
|
+ timeoutThread.interrupt();
|
|
|
+ }
|
|
|
+
|
|
|
private static void checkRecord(IndexRecord rec, long fill) {
|
|
|
assertEquals(fill, rec.startOffset);
|
|
|
assertEquals(fill, rec.rawLength);
|