Ver código fonte

YARN-1690. Made DistributedShell send timeline entities+events. Contributed by Mayank Bansal.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1579123 13f79535-47bb-0310-9956-ffa450edef68
Zhijie Shen 11 anos atrás
pai
commit
fd1c424548

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -175,6 +175,9 @@ Release 2.4.0 - UNRELEASED
 
     YARN-1705. Reset cluster-metrics on transition to standby. (Rohith via kasha)
 
+    YARN-1690. Made DistributedShell send timeline entities+events. (Mayank Bansal
+    via zjshen)
+
   IMPROVEMENTS
 
     YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

+ 105 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -45,6 +45,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,6 +56,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
@@ -80,7 +82,10 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -90,6 +95,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -160,6 +166,18 @@ public class ApplicationMaster {
 
   private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
 
+  @VisibleForTesting
+  @Private
+  public static enum DSEvent {
+    DS_APP_ATTEMPT_START, DS_APP_ATTEMPT_END, DS_CONTAINER_START, DS_CONTAINER_END
+  }
+  
+  @VisibleForTesting
+  @Private
+  public static enum DSEntity {
+    DS_APP_ATTEMPT, DS_CONTAINER
+  }
+
   // Configuration
   private Configuration conf;
 
@@ -242,6 +260,9 @@ public class ApplicationMaster {
   // Launch threads
   private List<Thread> launchThreads = new ArrayList<Thread>();
 
+  // Timeline Client
+  private TimelineClient timelineClient;
+
   private final String linux_bash_command = "bash";
   private final String windows_command = "cmd /c";
 
@@ -261,7 +282,8 @@ public class ApplicationMaster {
       result = appMaster.finish();
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
-      System.exit(1);
+      LogManager.shutdown();
+      ExitUtil.terminate(1, t);
     }
     if (result) {
       LOG.info("Application Master completed successfully. exiting");
@@ -316,7 +338,6 @@ public class ApplicationMaster {
    * @throws IOException
    */
   public boolean init(String[] args) throws ParseException, IOException {
-
     Options opts = new Options();
     opts.addOption("app_attempt_id", true,
         "App Attempt ID. Not to be used unless for testing purposes");
@@ -464,6 +485,11 @@ public class ApplicationMaster {
     requestPriority = Integer.parseInt(cliParser
         .getOptionValue("priority", "0"));
 
+    // Creating the Timeline Client
+    timelineClient = TimelineClient.createTimelineClient();
+    timelineClient.init(conf);
+    timelineClient.start();
+
     return true;
   }
 
@@ -485,6 +511,13 @@ public class ApplicationMaster {
   @SuppressWarnings({ "unchecked" })
   public void run() throws YarnException, IOException {
     LOG.info("Starting ApplicationMaster");
+    try {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_START);
+    } catch (Exception e) {
+      LOG.error("App Attempt start event coud not be pulished for "
+          + appAttemptID.toString(), e);
+    }
 
     Credentials credentials =
         UserGroupInformation.getCurrentUser().getCredentials();
@@ -564,6 +597,13 @@ public class ApplicationMaster {
       amRMClient.addContainerRequest(containerAsk);
     }
     numRequestedContainers.set(numTotalContainersToRequest);
+    try {
+      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+          DSEvent.DS_APP_ATTEMPT_END);
+    } catch (Exception e) {
+      LOG.error("App Attempt start event coud not be pulished for "
+          + appAttemptID.toString(), e);
+    }
   }
 
   @VisibleForTesting
@@ -668,6 +708,12 @@ public class ApplicationMaster {
           LOG.info("Container completed successfully." + ", containerId="
               + containerStatus.getContainerId());
         }
+        try {
+          publishContainerEndEvent(timelineClient, containerStatus);
+        } catch (Exception e) {
+          LOG.error("Container start event could not be pulished for "
+              + containerStatus.getContainerId().toString(), e);
+        }
       }
       
       // ask for more containers if any failed
@@ -782,6 +828,13 @@ public class ApplicationMaster {
       if (container != null) {
         applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
       }
+      try {
+        ApplicationMaster.publishContainerStartEvent(
+            applicationMaster.timelineClient, container);
+      } catch (Exception e) {
+        LOG.error("Container start event coud not be pulished for "
+            + container.getId().toString(), e);
+      }
     }
 
     @Override
@@ -968,4 +1021,54 @@ public class ApplicationMaster {
       org.apache.commons.io.IOUtils.closeQuietly(ds);
     }
   }
+  
+  private static void publishContainerStartEvent(TimelineClient timelineClient,
+      Container container) throws IOException, YarnException {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityId(container.getId().toString());
+    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
+    entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
+        .toString());
+    TimelineEvent event = new TimelineEvent();
+    event.setTimestamp(System.currentTimeMillis());
+    event.setEventType(DSEvent.DS_CONTAINER_START.toString());
+    event.addEventInfo("Node", container.getNodeId().toString());
+    event.addEventInfo("Resources", container.getResource().toString());
+    entity.addEvent(event);
+
+    timelineClient.putEntities(entity);
+  }
+
+  private static void publishContainerEndEvent(TimelineClient timelineClient,
+      ContainerStatus container) throws IOException, YarnException {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityId(container.getContainerId().toString());
+    entity.setEntityType(DSEntity.DS_CONTAINER.toString());
+    entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
+        .toString());
+    TimelineEvent event = new TimelineEvent();
+    event.setTimestamp(System.currentTimeMillis());
+    event.setEventType(DSEvent.DS_CONTAINER_END.toString());
+    event.addEventInfo("State", container.getState().name());
+    event.addEventInfo("Exit Status", container.getExitStatus());
+    entity.addEvent(event);
+
+    timelineClient.putEntities(entity);
+  }
+
+  private static void publishApplicationAttemptEvent(
+      TimelineClient timelineClient, String appAttemptId, DSEvent appEvent)
+      throws IOException, YarnException {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setEntityId(appAttemptId);
+    entity.setEntityType(DSEntity.DS_APP_ATTEMPT.toString());
+    entity.addPrimaryFilter("user", UserGroupInformation.getCurrentUser()
+        .toString());
+    TimelineEvent event = new TimelineEvent();
+    event.setEventType(appEvent.toString());
+    event.setTimestamp(System.currentTimeMillis());
+    entity.addEvent(event);
+
+    timelineClient.putEntities(entity);
+  }
 }

+ 37 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -36,11 +36,14 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -68,6 +71,7 @@ public class TestDistributedShell {
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, 
         FifoScheduler.class, ResourceScheduler.class);
+    conf.set("yarn.log.dir", "target");
     if (yarnCluster == null) {
       yarnCluster = new MiniYARNCluster(
         TestDistributedShell.class.getSimpleName(), 1, 1, 1);
@@ -92,6 +96,12 @@ public class TestDistributedShell {
       os.write(bytesOut.toByteArray());
       os.close();
     }
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf
+                .get("yarn.timeline-service.leveldb-timeline-store.path")),
+            true);
     try {
       Thread.sleep(2000);
     } catch (InterruptedException e) {
@@ -108,6 +118,12 @@ public class TestDistributedShell {
         yarnCluster = null;
       }
     }
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf
+                .get("yarn.timeline-service.leveldb-timeline-store.path")),
+            true);
   }
   
   @Test(timeout=90000)
@@ -171,7 +187,27 @@ public class TestDistributedShell {
     t.join();
     LOG.info("Client run completed. Result=" + result);
     Assert.assertTrue(result.get());
-
+    
+    TimelineEntities entitiesAttempts = yarnCluster
+        .getApplicationHistoryServer()
+        .getTimelineStore()
+        .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
+            null, null, null, null, null, null);
+    Assert.assertNotNull(entitiesAttempts);
+    Assert.assertEquals(1, entitiesAttempts.getEntities().size());
+    Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
+        .size());
+    Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
+        .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
+    TimelineEntities entities = yarnCluster
+        .getApplicationHistoryServer()
+        .getTimelineStore()
+        .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
+            null, null, null, null, null);
+    Assert.assertNotNull(entities);
+    Assert.assertEquals(2, entities.getEntities().size());
+    Assert.assertEquals(entities.getEntities().get(0).getEntityType()
+        .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
   }
 
   @Test(timeout=90000)

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java

@@ -169,5 +169,12 @@ public class ApplicationHistoryServer extends CompositeService {
       throw new YarnRuntimeException(msg, e);
     }
   }
-
+  /**
+   * @return ApplicationTimelineStore
+   */
+  @Private
+  @VisibleForTesting
+  public TimelineStore getTimelineStore() {
+    return timelineStore;
+  }
 }

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
@@ -100,6 +101,9 @@ public class MiniYARNCluster extends CompositeService {
   private ResourceManager[] resourceManagers;
   private String[] rmIds;
 
+  private ApplicationHistoryServer appHistoryServer;
+  private ApplicationHistoryServerWrapper appHistoryServerWrapper;
+
   private boolean useFixedPorts;
   private boolean useRpc = false;
   private int failoverTimeout;
@@ -241,6 +245,8 @@ public class MiniYARNCluster extends CompositeService {
       addService(new NodeManagerWrapper(index));
     }
 
+    addService(new ApplicationHistoryServerWrapper());
+    
     super.serviceInit(
         conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
   }
@@ -649,4 +655,67 @@ public class MiniYARNCluster extends CompositeService {
     }
     return false;
   }
+  
+  private class ApplicationHistoryServerWrapper extends AbstractService {
+    public ApplicationHistoryServerWrapper() {
+      super(ApplicationHistoryServerWrapper.class.getName());
+    }
+
+    @Override
+    protected synchronized void serviceInit(Configuration conf)
+        throws Exception {
+      if (!conf.getBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS,
+          YarnConfiguration.DEFAULT_YARN_MINICLUSTER_FIXED_PORTS)) {
+        conf.set(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS);
+        conf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
+      }
+      appHistoryServer = new ApplicationHistoryServer();
+      appHistoryServer.init(conf);
+      super.serviceInit(conf);
+    }
+
+    @Override
+    protected synchronized void serviceStart() throws Exception {
+      try {
+        new Thread() {
+          public void run() {
+            appHistoryServer.start();
+          };
+        }.start();
+        int waitCount = 0;
+        while (appHistoryServer.getServiceState() == STATE.INITED
+            && waitCount++ < 60) {
+          LOG.info("Waiting for Timeline Server to start...");
+          Thread.sleep(1500);
+        }
+        if (appHistoryServer.getServiceState() != STATE.STARTED) {
+          // AHS could have failed.
+          throw new IOException(
+              "ApplicationHistoryServer failed to start. Final state is "
+                  + appHistoryServer.getServiceState());
+        }
+        super.serviceStart();
+      } catch (Throwable t) {
+        throw new YarnRuntimeException(t);
+      }
+      LOG.info("MiniYARN ApplicationHistoryServer address: "
+          + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_ADDRESS));
+      LOG.info("MiniYARN ApplicationHistoryServer web address: "
+          + getConfig().get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS));
+    }
+
+    @Override
+    protected synchronized void serviceStop() throws Exception {
+      if (appHistoryServer != null) {
+        appHistoryServer.stop();
+      }
+      super.serviceStop();
+    }
+  }
+
+  public ApplicationHistoryServer getApplicationHistoryServer() {
+    return this.appHistoryServer;
+  }
 }