|
@@ -21,9 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
|
|
import static org.junit.Assert.assertEquals;
|
|
|
import static org.junit.Assert.assertFalse;
|
|
|
import static org.junit.Assert.assertTrue;
|
|
|
+import static org.mockito.ArgumentMatchers.any;
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
|
import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.never;
|
|
|
import static org.mockito.Mockito.spy;
|
|
|
import static org.mockito.Mockito.timeout;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
|
|
|
import java.io.BufferedReader;
|
|
|
import java.io.File;
|
|
@@ -32,12 +36,14 @@ import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.regex.Pattern;
|
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileUtil;
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
@@ -61,12 +67,19 @@ import org.apache.hadoop.yarn.api.records.Token;
|
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Event;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
|
|
@@ -76,6 +89,7 @@ import org.apache.hadoop.yarn.util.TestProcfsBasedProcessTree;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.ArgumentCaptor;
|
|
|
import org.mockito.Mockito;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -88,6 +102,7 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|
|
static {
|
|
|
LOG = LoggerFactory.getLogger(TestContainersMonitor.class);
|
|
|
}
|
|
|
+
|
|
|
@Before
|
|
|
public void setup() throws IOException {
|
|
|
conf.setClass(
|
|
@@ -353,6 +368,164 @@ public class TestContainersMonitor extends BaseContainerManagerTest {
|
|
|
.build()));
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Test
|
|
|
+ public void testContainerKillOnExcessLogDirectory() throws Exception {
|
|
|
+ final String user = "someuser";
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
|
|
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
|
|
+ Application app = mock(Application.class);
|
|
|
+ doReturn(user).when(app).getUser();
|
|
|
+ doReturn(appId).when(app).getAppId();
|
|
|
+ Container container = mock(Container.class);
|
|
|
+ doReturn(cid).when(container).getContainerId();
|
|
|
+ doReturn(user).when(container).getUser();
|
|
|
+ File containerLogDir = new File(new File(localLogDir, appId.toString()),
|
|
|
+ cid.toString());
|
|
|
+ containerLogDir.mkdirs();
|
|
|
+ LocalDirsHandlerService mockDirsHandler =
|
|
|
+ mock(LocalDirsHandlerService.class);
|
|
|
+ doReturn(Collections.singletonList(localLogDir.getAbsolutePath()))
|
|
|
+ .when(mockDirsHandler).getLogDirsForRead();
|
|
|
+ Context ctx = new NMContext(context.getContainerTokenSecretManager(),
|
|
|
+ context.getNMTokenSecretManager(), mockDirsHandler,
|
|
|
+ context.getApplicationACLsManager(), context.getNMStateStore(),
|
|
|
+ false, conf);
|
|
|
+
|
|
|
+ Configuration monitorConf = new Configuration(conf);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
|
|
|
+ false);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
|
|
|
+ true);
|
|
|
+ monitorConf.setLong(
|
|
|
+ YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES, 10);
|
|
|
+ monitorConf.setLong(
|
|
|
+ YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES, 10000000);
|
|
|
+ monitorConf.setLong(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
|
|
|
+ 10);
|
|
|
+
|
|
|
+ EventHandler mockHandler = mock(EventHandler.class);
|
|
|
+ AsyncDispatcher mockDispatcher = mock(AsyncDispatcher.class);
|
|
|
+ doReturn(mockHandler).when(mockDispatcher).getEventHandler();
|
|
|
+ ContainersMonitor monitor = new ContainersMonitorImpl(
|
|
|
+ mock(ContainerExecutor.class), mockDispatcher, ctx);
|
|
|
+ monitor.init(monitorConf);
|
|
|
+ monitor.start();
|
|
|
+ Event event;
|
|
|
+ try {
|
|
|
+ ctx.getApplications().put(appId, app);
|
|
|
+ ctx.getContainers().put(cid, container);
|
|
|
+ monitor.handle(new ContainerStartMonitoringEvent(cid, 1, 1, 1, 0, 0));
|
|
|
+
|
|
|
+ PrintWriter fileWriter = new PrintWriter(new File(containerLogDir,
|
|
|
+ "log"));
|
|
|
+ fileWriter.write("This container is logging too much.");
|
|
|
+ fileWriter.close();
|
|
|
+
|
|
|
+ ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
|
|
|
+ verify(mockHandler, timeout(10000)).handle(captor.capture());
|
|
|
+ event = captor.getValue();
|
|
|
+ } finally {
|
|
|
+ monitor.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ assertTrue("Expected a kill event", event instanceof ContainerKillEvent);
|
|
|
+ ContainerKillEvent cke = (ContainerKillEvent) event;
|
|
|
+ assertEquals("Unexpected container exit status",
|
|
|
+ ContainerExitStatus.KILLED_FOR_EXCESS_LOGS,
|
|
|
+ cke.getContainerExitStatus());
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Test
|
|
|
+ public void testContainerKillOnExcessTotalLogs() throws Exception {
|
|
|
+ final String user = "someuser";
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
|
|
|
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
|
|
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
|
|
+ Application app = mock(Application.class);
|
|
|
+ doReturn(user).when(app).getUser();
|
|
|
+ doReturn(appId).when(app).getAppId();
|
|
|
+ Container container = mock(Container.class);
|
|
|
+ doReturn(cid).when(container).getContainerId();
|
|
|
+ doReturn(user).when(container).getUser();
|
|
|
+ File logDir1 = new File(localLogDir, "dir1");
|
|
|
+ File logDir2 = new File(localLogDir, "dir2");
|
|
|
+ List<String> logDirs = new ArrayList<>();
|
|
|
+ logDirs.add(logDir1.getAbsolutePath());
|
|
|
+ logDirs.add(logDir2.getAbsolutePath());
|
|
|
+ LocalDirsHandlerService mockDirsHandler =
|
|
|
+ mock(LocalDirsHandlerService.class);
|
|
|
+ doReturn(logDirs).when(mockDirsHandler).getLogDirsForRead();
|
|
|
+ Context ctx = new NMContext(context.getContainerTokenSecretManager(),
|
|
|
+ context.getNMTokenSecretManager(), mockDirsHandler,
|
|
|
+ context.getApplicationACLsManager(), context.getNMStateStore(),
|
|
|
+ false, conf);
|
|
|
+
|
|
|
+ File clogDir1 = new File(new File(logDir1, appId.toString()),
|
|
|
+ cid.toString());
|
|
|
+ clogDir1.mkdirs();
|
|
|
+ File clogDir2 = new File(new File(logDir2, appId.toString()),
|
|
|
+ cid.toString());
|
|
|
+ clogDir2.mkdirs();
|
|
|
+
|
|
|
+ Configuration monitorConf = new Configuration(conf);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_METRICS_ENABLE,
|
|
|
+ false);
|
|
|
+ monitorConf.setBoolean(YarnConfiguration.NM_CONTAINER_LOG_MONITOR_ENABLED,
|
|
|
+ true);
|
|
|
+ monitorConf.setLong(
|
|
|
+ YarnConfiguration.NM_CONTAINER_LOG_DIR_SIZE_LIMIT_BYTES, 100000);
|
|
|
+ monitorConf.setLong(
|
|
|
+ YarnConfiguration.NM_CONTAINER_LOG_TOTAL_SIZE_LIMIT_BYTES, 15);
|
|
|
+ monitorConf.setLong(YarnConfiguration.NM_CONTAINER_LOG_MON_INTERVAL_MS,
|
|
|
+ 10);
|
|
|
+ monitorConf.set(YarnConfiguration.NM_LOG_DIRS, logDir1.getAbsolutePath()
|
|
|
+ + "," + logDir2.getAbsolutePath());
|
|
|
+
|
|
|
+ EventHandler mockHandler = mock(EventHandler.class);
|
|
|
+ AsyncDispatcher mockDispatcher = mock(AsyncDispatcher.class);
|
|
|
+ doReturn(mockHandler).when(mockDispatcher).getEventHandler();
|
|
|
+ ContainersMonitor monitor = new ContainersMonitorImpl(
|
|
|
+ mock(ContainerExecutor.class), mockDispatcher, ctx);
|
|
|
+ monitor.init(monitorConf);
|
|
|
+ monitor.start();
|
|
|
+ Event event;
|
|
|
+ try {
|
|
|
+ ctx.getApplications().put(appId, app);
|
|
|
+ ctx.getContainers().put(cid, container);
|
|
|
+ monitor.handle(new ContainerStartMonitoringEvent(cid, 1, 1, 1, 0, 0));
|
|
|
+
|
|
|
+ PrintWriter fileWriter = new PrintWriter(new File(clogDir1, "log"));
|
|
|
+ fileWriter.write("0123456789");
|
|
|
+ fileWriter.close();
|
|
|
+
|
|
|
+ Thread.sleep(1000);
|
|
|
+ verify(mockHandler, never()).handle(any(Event.class));
|
|
|
+
|
|
|
+ fileWriter = new PrintWriter(new File(clogDir2, "log"));
|
|
|
+ fileWriter.write("0123456789");
|
|
|
+ fileWriter.close();
|
|
|
+
|
|
|
+ ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
|
|
|
+ verify(mockHandler, timeout(10000)).handle(captor.capture());
|
|
|
+ event = captor.getValue();
|
|
|
+ } finally {
|
|
|
+ monitor.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ assertTrue("Expected a kill event", event instanceof ContainerKillEvent);
|
|
|
+ ContainerKillEvent cke = (ContainerKillEvent) event;
|
|
|
+ assertEquals("Unexpected container exit status",
|
|
|
+ ContainerExitStatus.KILLED_FOR_EXCESS_LOGS,
|
|
|
+ cke.getContainerExitStatus());
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout = 20000)
|
|
|
public void testContainerMonitorMemFlags() {
|
|
|
ContainersMonitor cm = null;
|