|
@@ -28,14 +28,21 @@ import static org.mockito.Mockito.times;
|
|
import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.verify;
|
|
import static org.mockito.Mockito.when;
|
|
import static org.mockito.Mockito.when;
|
|
|
|
|
|
|
|
+import java.io.ByteArrayInputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.io.InputStream;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
|
|
|
|
import com.google.common.base.Supplier;
|
|
import com.google.common.base.Supplier;
|
|
|
|
+import org.junit.After;
|
|
import org.junit.Assert;
|
|
import org.junit.Assert;
|
|
|
|
+import org.junit.BeforeClass;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileStatus;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
@@ -43,6 +50,7 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.JobConf;
|
|
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
|
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
|
|
|
|
+import org.apache.hadoop.mapred.ReduceTaskAttemptImpl;
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
import org.apache.hadoop.mapreduce.Counters;
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
import org.apache.hadoop.mapreduce.JobCounter;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
@@ -83,24 +91,36 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
|
+import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.Clock;
|
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
import org.apache.hadoop.yarn.util.SystemClock;
|
|
|
|
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
|
+import org.apache.log4j.AppenderSkeleton;
|
|
|
|
+import org.apache.log4j.Level;
|
|
|
|
+import org.apache.log4j.Logger;
|
|
|
|
+import org.apache.log4j.spi.LoggingEvent;
|
|
import org.junit.Test;
|
|
import org.junit.Test;
|
|
import org.mockito.ArgumentCaptor;
|
|
import org.mockito.ArgumentCaptor;
|
|
|
|
|
|
|
|
+import com.google.common.collect.ImmutableList;
|
|
|
|
+
|
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
|
public class TestTaskAttempt{
|
|
public class TestTaskAttempt{
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource";
|
|
|
|
+
|
|
static public class StubbedFS extends RawLocalFileSystem {
|
|
static public class StubbedFS extends RawLocalFileSystem {
|
|
@Override
|
|
@Override
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
public FileStatus getFileStatus(Path f) throws IOException {
|
|
@@ -108,6 +128,63 @@ public class TestTaskAttempt{
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static class CustomResourceTypesConfigurationProvider
|
|
|
|
+ extends LocalConfigurationProvider {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public InputStream getConfigurationInputStream(Configuration bootstrapConf,
|
|
|
|
+ String name) throws YarnException, IOException {
|
|
|
|
+ if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
|
|
|
|
+ return new ByteArrayInputStream(
|
|
|
|
+ ("<configuration>\n" +
|
|
|
|
+ " <property>\n" +
|
|
|
|
+ " <name>yarn.resource-types</name>\n" +
|
|
|
|
+ " <value>a-custom-resource</value>\n" +
|
|
|
|
+ " </property>\n" +
|
|
|
|
+ " <property>\n" +
|
|
|
|
+ " <name>yarn.resource-types.a-custom-resource.units</name>\n" +
|
|
|
|
+ " <value>G</value>\n" +
|
|
|
|
+ " </property>\n" +
|
|
|
|
+ "</configuration>\n").getBytes());
|
|
|
|
+ } else {
|
|
|
|
+ return super.getConfigurationInputStream(bootstrapConf, name);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class TestAppender extends AppenderSkeleton {
|
|
|
|
+
|
|
|
|
+ private final List<LoggingEvent> logEvents = new CopyOnWriteArrayList<>();
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean requiresLayout() {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void close() {
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void append(LoggingEvent arg0) {
|
|
|
|
+ logEvents.add(arg0);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private List<LoggingEvent> getLogEvents() {
|
|
|
|
+ return logEvents;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @BeforeClass
|
|
|
|
+ public static void setupBeforeClass() {
|
|
|
|
+ ResourceUtils.resetResourceTypes(new Configuration());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @After
|
|
|
|
+ public void tearDown() {
|
|
|
|
+ ResourceUtils.resetResourceTypes(new Configuration());
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
public void testMRAppHistoryForMap() throws Exception {
|
|
public void testMRAppHistoryForMap() throws Exception {
|
|
MRApp app = new FailingAttemptsMRApp(1, 0);
|
|
MRApp app = new FailingAttemptsMRApp(1, 0);
|
|
@@ -329,17 +406,18 @@ public class TestTaskAttempt{
|
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
|
|
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
|
|
Clock clock = SystemClock.getInstance();
|
|
Clock clock = SystemClock.getInstance();
|
|
- return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
|
|
|
|
|
|
+ return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo,
|
|
|
|
+ clock, new JobConf());
|
|
}
|
|
}
|
|
|
|
|
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
private TaskAttemptImpl createMapTaskAttemptImplForTest(
|
|
- EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
|
|
|
|
|
|
+ EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo,
|
|
|
|
+ Clock clock, JobConf jobConf) {
|
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
|
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
|
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
|
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
|
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
|
Path jobFile = mock(Path.class);
|
|
Path jobFile = mock(Path.class);
|
|
- JobConf jobConf = new JobConf();
|
|
|
|
TaskAttemptImpl taImpl =
|
|
TaskAttemptImpl taImpl =
|
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
|
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
|
taskSplitMetaInfo, jobConf, taListener, null,
|
|
taskSplitMetaInfo, jobConf, taListener, null,
|
|
@@ -347,6 +425,20 @@ public class TestTaskAttempt{
|
|
return taImpl;
|
|
return taImpl;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private TaskAttemptImpl createReduceTaskAttemptImplForTest(
|
|
|
|
+ EventHandler eventHandler, Clock clock, JobConf jobConf) {
|
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
|
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
|
|
|
|
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE);
|
|
|
|
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
|
|
|
|
+ Path jobFile = mock(Path.class);
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
|
|
|
|
+ 1, jobConf, taListener, null,
|
|
|
|
+ null, clock, null);
|
|
|
|
+ return taImpl;
|
|
|
|
+ }
|
|
|
|
+
|
|
private void testMRAppHistory(MRApp app) throws Exception {
|
|
private void testMRAppHistory(MRApp app) throws Exception {
|
|
Configuration conf = new Configuration();
|
|
Configuration conf = new Configuration();
|
|
Job job = app.submit(conf);
|
|
Job job = app.submit(conf);
|
|
@@ -1423,6 +1515,271 @@ public class TestTaskAttempt{
|
|
assertFalse("InternalError occurred", eventHandler.internalError);
|
|
assertFalse("InternalError occurred", eventHandler.internalError);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Test
|
|
|
|
+ public void testMapperCustomResourceTypes() {
|
|
|
|
+ initResourceTypes();
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo();
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.setLong(MRJobConfig.MAP_RESOURCE_TYPE_PREFIX
|
|
|
|
+ + CUSTOM_RESOURCE_NAME, 7L);
|
|
|
|
+ TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler,
|
|
|
|
+ taskSplitMetaInfo, clock, jobConf);
|
|
|
|
+ ResourceInformation resourceInfo =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getResourceInformation(CUSTOM_RESOURCE_NAME);
|
|
|
|
+ assertEquals("Expecting the default unit (G)",
|
|
|
|
+ "G", resourceInfo.getUnits());
|
|
|
|
+ assertEquals(7L, resourceInfo.getValue());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerCustomResourceTypes() {
|
|
|
|
+ initResourceTypes();
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
|
|
|
|
+ + CUSTOM_RESOURCE_NAME, "3m");
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ ResourceInformation resourceInfo =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getResourceInformation(CUSTOM_RESOURCE_NAME);
|
|
|
|
+ assertEquals("Expecting the specified unit (m)",
|
|
|
|
+ "m", resourceInfo.getUnits());
|
|
|
|
+ assertEquals(3L, resourceInfo.getValue());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerMemoryRequestViaMapreduceReduceMemoryMb() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ long memorySize =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getMemorySize();
|
|
|
|
+ assertEquals(2048, memorySize);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerMemoryRequestViaMapreduceReduceResourceMemory() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY, "2 Gi");
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ long memorySize =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getMemorySize();
|
|
|
|
+ assertEquals(2048, memorySize);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerMemoryRequestDefaultMemory() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
|
|
|
|
+ long memorySize =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getMemorySize();
|
|
|
|
+ assertEquals(MRJobConfig.DEFAULT_REDUCE_MEMORY_MB, memorySize);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerMemoryRequestWithoutUnits() {
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ for (String memoryResourceName : ImmutableList.of(
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.setInt(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
|
|
|
+ memoryResourceName, 2048);
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ long memorySize =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getMemorySize();
|
|
|
|
+ assertEquals(2048, memorySize);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerMemoryRequestOverriding() {
|
|
|
|
+ for (String memoryName : ImmutableList.of(
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
|
|
|
+ TestAppender testAppender = new TestAppender();
|
|
|
|
+ final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
|
|
|
|
+ try {
|
|
|
|
+ logger.addAppender(testAppender);
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
|
|
|
|
+ "3Gi");
|
|
|
|
+ jobConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ long memorySize =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getMemorySize();
|
|
|
|
+ assertEquals(3072, memorySize);
|
|
|
|
+ boolean foundLogWarning = false;
|
|
|
|
+ for (LoggingEvent e : testAppender.getLogEvents()) {
|
|
|
|
+ if (e.getLevel() == Level.WARN && ("Configuration " +
|
|
|
|
+ "mapreduce.reduce.resource." + memoryName + "=3Gi is " +
|
|
|
|
+ "overriding the mapreduce.reduce.memory.mb=2048 configuration")
|
|
|
|
+ .equals(e.getMessage())) {
|
|
|
|
+ foundLogWarning = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertTrue(foundLogWarning);
|
|
|
|
+ } finally {
|
|
|
|
+ logger.removeAppender(testAppender);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(expected=IllegalArgumentException.class)
|
|
|
|
+ public void testReducerMemoryRequestMultipleName() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ for (String memoryName : ImmutableList.of(
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_MEMORY,
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_ALTERNATIVE_NAME_MEMORY)) {
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX + memoryName,
|
|
|
|
+ "3Gi");
|
|
|
|
+ }
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerCpuRequestViaMapreduceReduceCpuVcores() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 3);
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ int vCores =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getVirtualCores();
|
|
|
|
+ assertEquals(3, vCores);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerCpuRequestViaMapreduceReduceResourceVcores() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "5");
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ int vCores =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getVirtualCores();
|
|
|
|
+ assertEquals(5, vCores);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerCpuRequestDefaultMemory() {
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, new JobConf());
|
|
|
|
+ int vCores =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getVirtualCores();
|
|
|
|
+ assertEquals(MRJobConfig.DEFAULT_REDUCE_CPU_VCORES, vCores);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testReducerCpuRequestOverriding() {
|
|
|
|
+ TestAppender testAppender = new TestAppender();
|
|
|
|
+ final Logger logger = Logger.getLogger(TaskAttemptImpl.class);
|
|
|
|
+ try {
|
|
|
|
+ logger.addAppender(testAppender);
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX +
|
|
|
|
+ MRJobConfig.RESOURCE_TYPE_NAME_VCORE, "7");
|
|
|
|
+ jobConf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 9);
|
|
|
|
+ TaskAttemptImpl taImpl =
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ long vCores =
|
|
|
|
+ getResourceInfoFromContainerRequest(taImpl, eventHandler).
|
|
|
|
+ getVirtualCores();
|
|
|
|
+ assertEquals(7, vCores);
|
|
|
|
+ boolean foundLogWarning = false;
|
|
|
|
+ for (LoggingEvent e : testAppender.getLogEvents()) {
|
|
|
|
+ if (e.getLevel() == Level.WARN && ("Configuration " +
|
|
|
|
+ "mapreduce.reduce.resource.vcores=7 is overriding the " +
|
|
|
|
+ "mapreduce.reduce.cpu.vcores=9 configuration"
|
|
|
|
+ ).equals(e.getMessage())) {
|
|
|
|
+ foundLogWarning = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertTrue(foundLogWarning);
|
|
|
|
+ } finally {
|
|
|
|
+ logger.removeAppender(testAppender);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Resource getResourceInfoFromContainerRequest(
|
|
|
|
+ TaskAttemptImpl taImpl, EventHandler eventHandler) {
|
|
|
|
+ taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
|
|
|
|
+ TaskAttemptEventType.TA_SCHEDULE));
|
|
|
|
+
|
|
|
|
+ assertEquals("Task attempt is not in STARTING state", taImpl.getState(),
|
|
|
|
+ TaskAttemptState.STARTING);
|
|
|
|
+
|
|
|
|
+ ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
|
|
|
|
+ verify(eventHandler, times(2)).handle(captor.capture());
|
|
|
|
+
|
|
|
|
+ List<ContainerRequestEvent> containerRequestEvents = new ArrayList<>();
|
|
|
|
+ for (Event e : captor.getAllValues()) {
|
|
|
|
+ if (e instanceof ContainerRequestEvent) {
|
|
|
|
+ containerRequestEvents.add((ContainerRequestEvent) e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ assertEquals("Expected one ContainerRequestEvent after scheduling "
|
|
|
|
+ + "task attempt", 1, containerRequestEvents.size());
|
|
|
|
+
|
|
|
|
+ return containerRequestEvents.get(0).getCapability();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test(expected=IllegalArgumentException.class)
|
|
|
|
+ public void testReducerCustomResourceTypeWithInvalidUnit() {
|
|
|
|
+ initResourceTypes();
|
|
|
|
+ EventHandler eventHandler = mock(EventHandler.class);
|
|
|
|
+ Clock clock = SystemClock.getInstance();
|
|
|
|
+ JobConf jobConf = new JobConf();
|
|
|
|
+ jobConf.set(MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX
|
|
|
|
+ + CUSTOM_RESOURCE_NAME, "3z");
|
|
|
|
+ createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void initResourceTypes() {
|
|
|
|
+ Configuration conf = new Configuration();
|
|
|
|
+ conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
|
|
|
+ CustomResourceTypesConfigurationProvider.class.getName());
|
|
|
|
+ ResourceUtils.resetResourceTypes(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
private void setupTaskAttemptFinishingMonitor(
|
|
private void setupTaskAttemptFinishingMonitor(
|
|
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
|
|
EventHandler eventHandler, JobConf jobConf, AppContext appCtx) {
|
|
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
|
|
TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
|