|
@@ -19,30 +19,17 @@
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
|
package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
|
|
|
|
|
|
import static org.junit.Assert.assertEquals;
|
|
import static org.junit.Assert.assertEquals;
|
|
-import static org.junit.Assert.assertTrue;
|
|
|
|
import static org.junit.Assert.fail;
|
|
import static org.junit.Assert.fail;
|
|
|
|
|
|
-import java.io.File;
|
|
|
|
-import java.io.FileWriter;
|
|
|
|
-import java.io.Writer;
|
|
|
|
-import java.net.HttpURLConnection;
|
|
|
|
-import java.net.URI;
|
|
|
|
-import java.net.URL;
|
|
|
|
-import java.util.ArrayList;
|
|
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
-import java.util.List;
|
|
|
|
import java.util.Properties;
|
|
import java.util.Properties;
|
|
|
|
|
|
import javax.servlet.FilterConfig;
|
|
import javax.servlet.FilterConfig;
|
|
import javax.servlet.ServletException;
|
|
import javax.servlet.ServletException;
|
|
-import javax.servlet.http.HttpServletResponse;
|
|
|
|
import javax.ws.rs.core.MediaType;
|
|
import javax.ws.rs.core.MediaType;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.fs.FileSystem;
|
|
|
|
-import org.apache.hadoop.fs.Path;
|
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
|
import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
|
|
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
|
import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
|
|
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
|
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
|
|
@@ -55,8 +42,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
|
|
|
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
|
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
|
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
|
|
@@ -96,17 +81,12 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
|
|
public class TestAHSWebServices extends JerseyTestBase {
|
|
public class TestAHSWebServices extends JerseyTestBase {
|
|
|
|
|
|
private static ApplicationHistoryClientService historyClientService;
|
|
private static ApplicationHistoryClientService historyClientService;
|
|
- private static AHSWebServices ahsWebservice;
|
|
|
|
private static final String[] USERS = new String[] { "foo" , "bar" };
|
|
private static final String[] USERS = new String[] { "foo" , "bar" };
|
|
private static final int MAX_APPS = 5;
|
|
private static final int MAX_APPS = 5;
|
|
- private static Configuration conf;
|
|
|
|
- private static FileSystem fs;
|
|
|
|
- private static final String remoteLogRootDir = "target/logs/";
|
|
|
|
- private static final String rootLogDir = "target/LocalLogs";
|
|
|
|
|
|
|
|
@BeforeClass
|
|
@BeforeClass
|
|
public static void setupClass() throws Exception {
|
|
public static void setupClass() throws Exception {
|
|
- conf = new YarnConfiguration();
|
|
|
|
|
|
+ Configuration conf = new YarnConfiguration();
|
|
TimelineStore store =
|
|
TimelineStore store =
|
|
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
|
|
TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
|
|
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
|
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
|
@@ -115,8 +95,6 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|
new TimelineDataManager(store, aclsManager);
|
|
new TimelineDataManager(store, aclsManager);
|
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo");
|
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo");
|
|
- conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
|
|
|
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
|
|
|
|
dataManager.init(conf);
|
|
dataManager.init(conf);
|
|
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
|
|
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
|
|
ApplicationHistoryManagerOnTimelineStore historyManager =
|
|
ApplicationHistoryManagerOnTimelineStore historyManager =
|
|
@@ -130,8 +108,6 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|
};
|
|
};
|
|
historyClientService.init(conf);
|
|
historyClientService.init(conf);
|
|
historyClientService.start();
|
|
historyClientService.start();
|
|
- ahsWebservice = new AHSWebServices(historyClientService, conf);
|
|
|
|
- fs = FileSystem.get(conf);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@AfterClass
|
|
@AfterClass
|
|
@@ -139,8 +115,6 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|
if (historyClientService != null) {
|
|
if (historyClientService != null) {
|
|
historyClientService.stop();
|
|
historyClientService.stop();
|
|
}
|
|
}
|
|
- fs.delete(new Path(remoteLogRootDir), true);
|
|
|
|
- fs.delete(new Path(rootLogDir), true);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@Parameterized.Parameters
|
|
@Parameterized.Parameters
|
|
@@ -153,7 +127,7 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|
@Override
|
|
@Override
|
|
protected void configureServlets() {
|
|
protected void configureServlets() {
|
|
bind(JAXBContextResolver.class);
|
|
bind(JAXBContextResolver.class);
|
|
- bind(AHSWebServices.class).toInstance(ahsWebservice);;
|
|
|
|
|
|
+ bind(AHSWebServices.class);
|
|
bind(GenericExceptionHandler.class);
|
|
bind(GenericExceptionHandler.class);
|
|
bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
|
|
bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
|
|
serve("/*").with(GuiceContainer.class);
|
|
serve("/*").with(GuiceContainer.class);
|
|
@@ -497,177 +471,4 @@ public class TestAHSWebServices extends JerseyTestBase {
|
|
assertEquals(ContainerState.COMPLETE.toString(),
|
|
assertEquals(ContainerState.COMPLETE.toString(),
|
|
container.getString("containerState"));
|
|
container.getString("containerState"));
|
|
}
|
|
}
|
|
-
|
|
|
|
- @Test(timeout = 10000)
|
|
|
|
- public void testContainerLogsForFinishedApps() throws Exception {
|
|
|
|
- String fileName = "syslog";
|
|
|
|
- String user = "user1";
|
|
|
|
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
|
|
|
|
- NodeId nodeId = NodeId.newInstance("test host", 100);
|
|
|
|
- NodeId nodeId2 = NodeId.newInstance("host2", 1234);
|
|
|
|
- //prepare the logs for remote directory
|
|
|
|
- ApplicationId appId = ApplicationId.newInstance(0, 1);
|
|
|
|
- // create local logs
|
|
|
|
- List<String> rootLogDirList = new ArrayList<String>();
|
|
|
|
- rootLogDirList.add(rootLogDir);
|
|
|
|
- Path rootLogDirPath = new Path(rootLogDir);
|
|
|
|
- if (fs.exists(rootLogDirPath)) {
|
|
|
|
- fs.delete(rootLogDirPath, true);
|
|
|
|
- }
|
|
|
|
- assertTrue(fs.mkdirs(rootLogDirPath));
|
|
|
|
-
|
|
|
|
- Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
|
|
|
- if (fs.exists(appLogsDir)) {
|
|
|
|
- fs.delete(appLogsDir, true);
|
|
|
|
- }
|
|
|
|
- assertTrue(fs.mkdirs(appLogsDir));
|
|
|
|
-
|
|
|
|
- // create container logs in local log file dir
|
|
|
|
- // create two container log files. We can get containerInfo
|
|
|
|
- // for container1 from AHS, but can not get such info for
|
|
|
|
- // container100
|
|
|
|
- ApplicationAttemptId appAttemptId =
|
|
|
|
- ApplicationAttemptId.newInstance(appId, 1);
|
|
|
|
- ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
|
- ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
|
|
|
|
- createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
|
|
|
|
- ("Hello." + containerId1));
|
|
|
|
- createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName,
|
|
|
|
- ("Hello." + containerId100));
|
|
|
|
-
|
|
|
|
- // upload container logs to remote log dir
|
|
|
|
- Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
|
|
|
- user + "/logs/" + appId.toString());
|
|
|
|
- if (fs.exists(path)) {
|
|
|
|
- fs.delete(path, true);
|
|
|
|
- }
|
|
|
|
- assertTrue(fs.mkdirs(path));
|
|
|
|
- uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
|
|
|
|
- containerId1, path, fs);
|
|
|
|
- uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
|
|
|
|
- containerId100, path, fs);
|
|
|
|
-
|
|
|
|
- // test whether we can find container log from remote diretory if
|
|
|
|
- // the containerInfo for this container could be fetched from AHS.
|
|
|
|
- WebResource r = resource();
|
|
|
|
- ClientResponse response = r.path("ws").path("v1")
|
|
|
|
- .path("applicationhistory").path("containerlogs")
|
|
|
|
- .path(containerId1.toString()).path(fileName)
|
|
|
|
- .queryParam("user.name", user)
|
|
|
|
- .accept(MediaType.TEXT_PLAIN)
|
|
|
|
- .get(ClientResponse.class);
|
|
|
|
- String responseText = response.getEntity(String.class);
|
|
|
|
- assertTrue(responseText.contains("Hello." + containerId1));
|
|
|
|
-
|
|
|
|
- // test whether we can find container log from remote diretory if
|
|
|
|
- // the containerInfo for this container could not be fetched from AHS.
|
|
|
|
- r = resource();
|
|
|
|
- response = r.path("ws").path("v1")
|
|
|
|
- .path("applicationhistory").path("containerlogs")
|
|
|
|
- .path(containerId100.toString()).path(fileName)
|
|
|
|
- .queryParam("user.name", user)
|
|
|
|
- .accept(MediaType.TEXT_PLAIN)
|
|
|
|
- .get(ClientResponse.class);
|
|
|
|
- responseText = response.getEntity(String.class);
|
|
|
|
- assertTrue(responseText.contains("Hello." + containerId100));
|
|
|
|
-
|
|
|
|
- // create an application which can not be found from AHS
|
|
|
|
- ApplicationId appId100 = ApplicationId.newInstance(0, 100);
|
|
|
|
- appLogsDir = new Path(rootLogDirPath, appId100.toString());
|
|
|
|
- if (fs.exists(appLogsDir)) {
|
|
|
|
- fs.delete(appLogsDir, true);
|
|
|
|
- }
|
|
|
|
- assertTrue(fs.mkdirs(appLogsDir));
|
|
|
|
- ApplicationAttemptId appAttemptId100 =
|
|
|
|
- ApplicationAttemptId.newInstance(appId100, 1);
|
|
|
|
- ContainerId containerId1ForApp100 = ContainerId
|
|
|
|
- .newContainerId(appAttemptId100, 1);
|
|
|
|
- createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs,
|
|
|
|
- fileName, ("Hello." + containerId1ForApp100));
|
|
|
|
- path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
|
|
|
|
- user + "/logs/" + appId100.toString());
|
|
|
|
- if (fs.exists(path)) {
|
|
|
|
- fs.delete(path, true);
|
|
|
|
- }
|
|
|
|
- assertTrue(fs.mkdirs(path));
|
|
|
|
- uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
|
|
|
|
- containerId1ForApp100, path, fs);
|
|
|
|
- r = resource();
|
|
|
|
- response = r.path("ws").path("v1")
|
|
|
|
- .path("applicationhistory").path("containerlogs")
|
|
|
|
- .path(containerId1ForApp100.toString()).path(fileName)
|
|
|
|
- .queryParam("user.name", user)
|
|
|
|
- .accept(MediaType.TEXT_PLAIN)
|
|
|
|
- .get(ClientResponse.class);
|
|
|
|
- responseText = response.getEntity(String.class);
|
|
|
|
- assertTrue(responseText.contains("Hello." + containerId1ForApp100));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static void createContainerLogInLocalDir(Path appLogsDir,
|
|
|
|
- ContainerId containerId, FileSystem fs, String fileName, String content)
|
|
|
|
- throws Exception {
|
|
|
|
- Path containerLogsDir = new Path(appLogsDir, containerId.toString());
|
|
|
|
- if (fs.exists(containerLogsDir)) {
|
|
|
|
- fs.delete(containerLogsDir, true);
|
|
|
|
- }
|
|
|
|
- assertTrue(fs.mkdirs(containerLogsDir));
|
|
|
|
- Writer writer =
|
|
|
|
- new FileWriter(new File(containerLogsDir.toString(), fileName));
|
|
|
|
- writer.write(content);
|
|
|
|
- writer.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
|
|
|
- Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
|
|
|
|
- ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
|
|
|
|
- Path path =
|
|
|
|
- new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
|
|
|
|
- AggregatedLogFormat.LogWriter writer =
|
|
|
|
- new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
|
|
|
- writer.writeApplicationOwner(ugi.getUserName());
|
|
|
|
-
|
|
|
|
- writer.append(new AggregatedLogFormat.LogKey(containerId),
|
|
|
|
- new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
|
|
|
- ugi.getShortUserName()));
|
|
|
|
- writer.close();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Test(timeout = 10000)
|
|
|
|
- public void testContainerLogsForRunningApps() throws Exception {
|
|
|
|
- String fileName = "syslog";
|
|
|
|
- String user = "user1";
|
|
|
|
- ApplicationId appId = ApplicationId.newInstance(
|
|
|
|
- 1234, 1);
|
|
|
|
- ApplicationAttemptId appAttemptId =
|
|
|
|
- ApplicationAttemptId.newInstance(appId, 1);
|
|
|
|
- ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
|
- WebResource r = resource();
|
|
|
|
- URI requestURI = r.path("ws").path("v1")
|
|
|
|
- .path("applicationhistory").path("containerlogs")
|
|
|
|
- .path(containerId1.toString()).path(fileName)
|
|
|
|
- .queryParam("user.name", user).getURI();
|
|
|
|
- String redirectURL = getRedirectURL(requestURI.toString());
|
|
|
|
- assertTrue(redirectURL != null);
|
|
|
|
- assertTrue(redirectURL.contains("test:1234"));
|
|
|
|
- assertTrue(redirectURL.contains("ws/v1/node/containerlogs"));
|
|
|
|
- assertTrue(redirectURL.contains(containerId1.toString()));
|
|
|
|
- assertTrue(redirectURL.contains("user.name=" + user));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static String getRedirectURL(String url) {
|
|
|
|
- String redirectUrl = null;
|
|
|
|
- try {
|
|
|
|
- HttpURLConnection conn = (HttpURLConnection) new URL(url)
|
|
|
|
- .openConnection();
|
|
|
|
- // do not automatically follow the redirection
|
|
|
|
- // otherwise we get too many redirections exception
|
|
|
|
- conn.setInstanceFollowRedirects(false);
|
|
|
|
- if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
|
|
|
|
- redirectUrl = conn.getHeaderField("Location");
|
|
|
|
- }
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- // throw new RuntimeException(e);
|
|
|
|
- }
|
|
|
|
- return redirectUrl;
|
|
|
|
- }
|
|
|
|
}
|
|
}
|