|
@@ -18,9 +18,20 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
|
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
|
|
|
|
|
|
-import static org.mockito.Mockito.*;
|
|
|
|
import static junit.framework.Assert.assertEquals;
|
|
import static junit.framework.Assert.assertEquals;
|
|
import static junit.framework.Assert.assertTrue;
|
|
import static junit.framework.Assert.assertTrue;
|
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
|
+import static org.mockito.Matchers.anyMap;
|
|
|
|
+import static org.mockito.Matchers.eq;
|
|
|
|
+import static org.mockito.Matchers.isA;
|
|
|
|
+import static org.mockito.Mockito.atLeast;
|
|
|
|
+import static org.mockito.Mockito.doThrow;
|
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
|
+import static org.mockito.Mockito.never;
|
|
|
|
+import static org.mockito.Mockito.reset;
|
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataInputStream;
|
|
@@ -44,17 +55,18 @@ import junit.framework.Assert;
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
|
+import org.apache.hadoop.fs.FileSystem;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
-import org.apache.hadoop.io.DataInputBuffer;
|
|
|
|
-import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
|
|
|
+import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.Credentials;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
import org.apache.hadoop.yarn.YarnException;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
@@ -70,16 +82,15 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
|
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
|
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
|
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
|
|
|
|
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
|
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
|
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
|
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
|
|
@@ -390,7 +401,63 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
|
|
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
|
|
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
|
|
dispatcher.stop();
|
|
dispatcher.stop();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testAppLogDirCreation() throws Exception {
|
|
|
|
+ final String logSuffix = "logs";
|
|
|
|
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS,
|
|
|
|
+ localLogDir.getAbsolutePath());
|
|
|
|
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
|
|
|
+ this.remoteRootLogDir.getAbsolutePath());
|
|
|
|
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
|
|
|
|
+
|
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
|
+ dispatcher.init(this.conf);
|
|
|
|
+ dispatcher.start();
|
|
|
|
+
|
|
|
|
+ FileSystem fs = FileSystem.get(this.conf);
|
|
|
|
+ final FileSystem spyFs = spy(FileSystem.get(this.conf));
|
|
|
|
+
|
|
|
|
+ LogAggregationService aggSvc = new LogAggregationService(dispatcher,
|
|
|
|
+ this.context, this.delSrvc, super.dirsHandler) {
|
|
|
|
+ @Override
|
|
|
|
+ protected FileSystem getFileSystem(Configuration conf) {
|
|
|
|
+ return spyFs;
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ aggSvc.init(this.conf);
|
|
|
|
+ aggSvc.start();
|
|
|
|
+
|
|
|
|
+ // start an application and verify user, suffix, and app dirs created
|
|
|
|
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
|
|
|
|
+ Path userDir = fs.makeQualified(new Path(
|
|
|
|
+ remoteRootLogDir.getAbsolutePath(), this.user));
|
|
|
|
+ Path suffixDir = new Path(userDir, logSuffix);
|
|
|
|
+ Path appDir = new Path(suffixDir, appId.toString());
|
|
|
|
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId, this.user, null,
|
|
|
|
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
|
|
|
+ verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
|
|
|
|
+ verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
|
|
|
|
+ verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
|
|
|
|
+
|
|
|
|
+ // start another application and verify only app dir created
|
|
|
|
+ ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
|
|
|
|
+ Path appDir2 = new Path(suffixDir, appId2.toString());
|
|
|
|
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
|
|
|
|
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
|
|
|
+ verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
|
|
|
|
+
|
|
|
|
+ // start another application with the app dir already created and verify
|
|
|
|
+ // we do not try to create it again
|
|
|
|
+ ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
|
|
|
|
+ Path appDir3 = new Path(suffixDir, appId3.toString());
|
|
|
|
+ new File(appDir3.toUri().getPath()).mkdir();
|
|
|
|
+ aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
|
|
|
|
+ ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
|
|
|
|
+ verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
|
|
|
|
+ }
|
|
|
|
+
|
|
@Test
|
|
@Test
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
|
|
public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
|