|
@@ -24,6 +24,7 @@ import java.io.IOException;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
|
|
|
+import com.google.common.cache.Cache;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
@@ -35,16 +36,27 @@ import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
|
|
|
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
|
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
|
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.junit.After;
|
|
|
import org.junit.Test;
|
|
|
import org.mockito.Mockito;
|
|
|
|
|
|
-import static org.junit.Assert.assertEquals;
|
|
|
-import static org.mockito.Mockito.*;
|
|
|
-import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
-
|
|
|
+import static junit.framework.TestCase.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.junit.Assert.fail;
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Matchers.eq;
|
|
|
+import static org.mockito.Mockito.doNothing;
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.timeout;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
|
|
|
|
|
public class TestJobHistory {
|
|
|
|
|
@@ -58,13 +70,15 @@ public class TestJobHistory {
|
|
|
|
|
|
Configuration conf = new Configuration();
|
|
|
// Set the cache size to 2
|
|
|
- conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "2");
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 2);
|
|
|
jobHistory.init(conf);
|
|
|
jobHistory.start();
|
|
|
|
|
|
CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
.getHistoryStorage());
|
|
|
|
|
|
+ assertFalse(storage.getUseLoadedTasksCache());
|
|
|
+
|
|
|
Job[] jobs = new Job[3];
|
|
|
JobId[] jobIds = new JobId[3];
|
|
|
|
|
@@ -84,14 +98,13 @@ public class TestJobHistory {
|
|
|
storage.getFullJob(jobs[i].getID());
|
|
|
}
|
|
|
|
|
|
- Map<JobId, Job> jobCache = storage.getLoadedJobCache();
|
|
|
- // job0 should have been purged since cache size is 2
|
|
|
- assertFalse(jobCache.containsKey(jobs[0].getID()));
|
|
|
- assertTrue(jobCache.containsKey(jobs[1].getID())
|
|
|
- && jobCache.containsKey(jobs[2].getID()));
|
|
|
+ Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
|
|
+ // Verify some jobs are stored in the cache. Hard to predict eviction
|
|
|
+ // in Guava version.
|
|
|
+ assertTrue(jobCache.size() > 0);
|
|
|
|
|
|
// Setting cache size to 3
|
|
|
- conf.set(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, "3");
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, 3);
|
|
|
doReturn(conf).when(storage).createConf();
|
|
|
|
|
|
when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
|
@@ -105,9 +118,223 @@ public class TestJobHistory {
|
|
|
|
|
|
jobCache = storage.getLoadedJobCache();
|
|
|
|
|
|
- // All three jobs should be in cache since its size is now 3
|
|
|
- for (int i = 0; i < 3; i++) {
|
|
|
- assertTrue(jobCache.containsKey(jobs[i].getID()));
|
|
|
+ // Verify some jobs are stored in the cache. Hard to predict eviction
|
|
|
+ // in Guava version.
|
|
|
+ assertTrue(jobCache.size() > 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testTasksCacheLimit() throws Exception {
|
|
|
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
|
|
+ jobHistory = spy(new JobHistory());
|
|
|
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // Set the cache threshold to 50 tasks
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
|
|
|
+ jobHistory.init(conf);
|
|
|
+ jobHistory.start();
|
|
|
+
|
|
|
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
+ .getHistoryStorage());
|
|
|
+
|
|
|
+ assertTrue(storage.getUseLoadedTasksCache());
|
|
|
+ assertEquals(storage.getLoadedTasksCacheSize(), 50);
|
|
|
+
|
|
|
+ // Create a bunch of smaller jobs (<< 50 tasks)
|
|
|
+ Job[] jobs = new Job[10];
|
|
|
+ JobId[] jobIds = new JobId[10];
|
|
|
+ for (int i = 0; i < jobs.length; i++) {
|
|
|
+ jobs[i] = mock(Job.class);
|
|
|
+ jobIds[i] = mock(JobId.class);
|
|
|
+ when(jobs[i].getID()).thenReturn(jobIds[i]);
|
|
|
+ when(jobs[i].getTotalMaps()).thenReturn(10);
|
|
|
+ when(jobs[i].getTotalReduces()).thenReturn(2);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Create some large jobs that forces task-based cache flushing
|
|
|
+ Job[] lgJobs = new Job[3];
|
|
|
+ JobId[] lgJobIds = new JobId[3];
|
|
|
+ for (int i = 0; i < lgJobs.length; i++) {
|
|
|
+ lgJobs[i] = mock(Job.class);
|
|
|
+ lgJobIds[i] = mock(JobId.class);
|
|
|
+ when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
|
|
|
+ when(lgJobs[i].getTotalMaps()).thenReturn(2000);
|
|
|
+ when(lgJobs[i].getTotalReduces()).thenReturn(10);
|
|
|
+ }
|
|
|
+
|
|
|
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
|
|
+ when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
|
|
+ when(fileInfo.loadJob()).thenReturn(jobs[0]).thenReturn(jobs[1])
|
|
|
+ .thenReturn(jobs[2]).thenReturn(jobs[3]).thenReturn(jobs[4])
|
|
|
+ .thenReturn(jobs[5]).thenReturn(jobs[6]).thenReturn(jobs[7])
|
|
|
+ .thenReturn(jobs[8]).thenReturn(jobs[9]).thenReturn(lgJobs[0])
|
|
|
+ .thenReturn(lgJobs[1]).thenReturn(lgJobs[2]);
|
|
|
+
|
|
|
+ // getFullJob will put the job in the cache if it isn't there
|
|
|
+ Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
|
|
+ for (int i = 0; i < jobs.length; i++) {
|
|
|
+ storage.getFullJob(jobs[i].getID());
|
|
|
+ }
|
|
|
+ long prevSize = jobCache.size();
|
|
|
+
|
|
|
+ // Fill the cache with some larger jobs and verify the cache
|
|
|
+ // gets reduced in size.
|
|
|
+ for (int i = 0; i < lgJobs.length; i++) {
|
|
|
+ storage.getFullJob(lgJobs[i].getID());
|
|
|
+ }
|
|
|
+ assertTrue(jobCache.size() < prevSize);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testJobCacheLimitLargerThanMax() throws Exception {
|
|
|
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
|
|
+ JobHistory jobHistory = spy(new JobHistory());
|
|
|
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // Set the cache threshold to 50 tasks
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 500);
|
|
|
+ jobHistory.init(conf);
|
|
|
+ jobHistory.start();
|
|
|
+
|
|
|
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
+ .getHistoryStorage());
|
|
|
+
|
|
|
+ assertTrue(storage.getUseLoadedTasksCache());
|
|
|
+ assertEquals(storage.getLoadedTasksCacheSize(), 500);
|
|
|
+
|
|
|
+ // Create a bunch of large jobs (>> 50 tasks)
|
|
|
+ Job[] lgJobs = new Job[10];
|
|
|
+ JobId[] lgJobIds = new JobId[10];
|
|
|
+ for (int i = 0; i < lgJobs.length; i++) {
|
|
|
+ lgJobs[i] = mock(Job.class);
|
|
|
+ lgJobIds[i] = mock(JobId.class);
|
|
|
+ when(lgJobs[i].getID()).thenReturn(lgJobIds[i]);
|
|
|
+ when(lgJobs[i].getTotalMaps()).thenReturn(700);
|
|
|
+ when(lgJobs[i].getTotalReduces()).thenReturn(50);
|
|
|
+ }
|
|
|
+
|
|
|
+ HistoryFileInfo fileInfo = mock(HistoryFileInfo.class);
|
|
|
+ when(historyManager.getFileInfo(any(JobId.class))).thenReturn(fileInfo);
|
|
|
+ when(fileInfo.loadJob()).thenReturn(lgJobs[0]).thenReturn(lgJobs[1])
|
|
|
+ .thenReturn(lgJobs[2]).thenReturn(lgJobs[3]).thenReturn(lgJobs[4])
|
|
|
+ .thenReturn(lgJobs[5]).thenReturn(lgJobs[6]).thenReturn(lgJobs[7])
|
|
|
+ .thenReturn(lgJobs[8]).thenReturn(lgJobs[9]);
|
|
|
+
|
|
|
+ // getFullJob will put the job in the cache if it isn't there
|
|
|
+ Cache<JobId, Job> jobCache = storage.getLoadedJobCache();
|
|
|
+ long[] cacheSize = new long[10];
|
|
|
+ for (int i = 0; i < lgJobs.length; i++) {
|
|
|
+ storage.getFullJob(lgJobs[i].getID());
|
|
|
+ assertTrue(jobCache.size() > 0);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLoadedTasksEmptyConfiguration() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.set(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, "");
|
|
|
+
|
|
|
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
|
|
+ JobHistory jobHistory = spy(new JobHistory());
|
|
|
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
|
|
+ jobHistory.init(conf);
|
|
|
+ jobHistory.start();
|
|
|
+
|
|
|
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
+ .getHistoryStorage());
|
|
|
+
|
|
|
+ assertFalse(storage.getUseLoadedTasksCache());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLoadedTasksZeroConfiguration() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 0);
|
|
|
+
|
|
|
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
|
|
+ JobHistory jobHistory = spy(new JobHistory());
|
|
|
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
|
|
+ jobHistory.init(conf);
|
|
|
+ jobHistory.start();
|
|
|
+
|
|
|
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
+ .getHistoryStorage());
|
|
|
+
|
|
|
+ assertTrue(storage.getUseLoadedTasksCache());
|
|
|
+ assertEquals(storage.getLoadedTasksCacheSize(), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLoadedTasksNegativeConfiguration() {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, -1);
|
|
|
+
|
|
|
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
|
|
+ JobHistory jobHistory = spy(new JobHistory());
|
|
|
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
|
|
+ jobHistory.init(conf);
|
|
|
+ jobHistory.start();
|
|
|
+
|
|
|
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
+ .getHistoryStorage());
|
|
|
+
|
|
|
+ assertTrue(storage.getUseLoadedTasksCache());
|
|
|
+ assertEquals(storage.getLoadedTasksCacheSize(), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testLoadJobErrorCases() throws IOException {
|
|
|
+ HistoryFileManager historyManager = mock(HistoryFileManager.class);
|
|
|
+ jobHistory = spy(new JobHistory());
|
|
|
+ doReturn(historyManager).when(jobHistory).createHistoryFileManager();
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ // Set the cache threshold to 50 tasks
|
|
|
+ conf.setInt(JHAdminConfig.MR_HISTORY_LOADED_TASKS_CACHE_SIZE, 50);
|
|
|
+ jobHistory.init(conf);
|
|
|
+ jobHistory.start();
|
|
|
+
|
|
|
+ CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory
|
|
|
+ .getHistoryStorage());
|
|
|
+
|
|
|
+ assertTrue(storage.getUseLoadedTasksCache());
|
|
|
+ assertEquals(storage.getLoadedTasksCacheSize(), 50);
|
|
|
+
|
|
|
+ // Create jobs for bad fileInfo results
|
|
|
+ Job[] jobs = new Job[4];
|
|
|
+ JobId[] jobIds = new JobId[4];
|
|
|
+ for (int i = 0; i < jobs.length; i++) {
|
|
|
+ jobs[i] = mock(Job.class);
|
|
|
+ jobIds[i] = mock(JobId.class);
|
|
|
+ when(jobs[i].getID()).thenReturn(jobIds[i]);
|
|
|
+ when(jobs[i].getTotalMaps()).thenReturn(10);
|
|
|
+ when(jobs[i].getTotalReduces()).thenReturn(2);
|
|
|
+ }
|
|
|
+
|
|
|
+ HistoryFileInfo loadJobException = mock(HistoryFileInfo.class);
|
|
|
+ when(loadJobException.loadJob()).thenThrow(new IOException("History file not found"));
|
|
|
+ when(historyManager.getFileInfo(jobIds[0])).thenThrow(new IOException(""));
|
|
|
+ when(historyManager.getFileInfo(jobIds[1])).thenReturn(null);
|
|
|
+ when(historyManager.getFileInfo(jobIds[2])).thenReturn(loadJobException);
|
|
|
+
|
|
|
+ try {
|
|
|
+ storage.getFullJob(jobIds[0]);
|
|
|
+ fail("Did not get expected YarnRuntimeException for getFileInfo() throwing IOException");
|
|
|
+ } catch (YarnRuntimeException e) {
|
|
|
+ // Expected
|
|
|
+ }
|
|
|
+
|
|
|
+ // fileInfo==null should return null
|
|
|
+ Job job = storage.getFullJob(jobIds[1]);
|
|
|
+ assertNull(job);
|
|
|
+
|
|
|
+ try {
|
|
|
+ storage.getFullJob(jobIds[2]);
|
|
|
+ fail("Did not get expected YarnRuntimeException for fileInfo.loadJob() throwing IOException");
|
|
|
+ } catch (YarnRuntimeException e) {
|
|
|
+ // Expected
|
|
|
}
|
|
|
}
|
|
|
|