浏览代码

MAPREDUCE-2652. svn merge -c r1163585 --ignore-ancestry trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1163626 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 年之前
父节点
当前提交
464486bee3
共有 23 个文件被更改,包括 422 次插入44 次删除
  1. 4 1
      hadoop-mapreduce-project/CHANGES.txt
  2. 7 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
  3. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
  4. 45 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerLaunchedEvent.java
  5. 6 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java
  6. 20 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
  7. 3 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
  8. 20 6
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  9. 3 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  10. 5 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  11. 7 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
  12. 5 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
  13. 6 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
  14. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
  15. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
  16. 64 8
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
  17. 10 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
  18. 9 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java
  19. 108 6
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java
  20. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
  21. 38 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
  22. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  23. 51 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java

+ 4 - 1
hadoop-mapreduce-project/CHANGES.txt

@@ -224,6 +224,10 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2701. app/Job.java needs UGI for the user that launched it.
     (Robert Evans via mahadev)
 
+    MAPREDUCE-2652. Enabled multiple NMs to be runnable on a single node by
+    making shuffle service port to be truely configurable. (Robert Evans via
+    vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and
@@ -650,7 +654,6 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-279. Fix in MR-279 branch. Distributed cache bug fix to pass Terasort.
     (vinodkv)
      
-
     MAPREDUCE-279. Fix in MR-279 branch. Fix null pointer exception in kill task
     attempt (mahadev)
 

+ 7 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -198,9 +199,13 @@ public class LocalContainerLauncher extends AbstractService implements
           // after "launching," send launched event to task attempt to move
           // state from ASSIGNED to RUNNING (also nukes "remoteTask", so must
           // do getRemoteTask() call first)
+          
+          //There is no port number because we are not really talking to a task
+          // tracker.  The shuffle is just done through local files.  So the
+          // port number is set to -1 in this case.
           context.getEventHandler().handle(
-              new TaskAttemptEvent(attemptID,
-                  TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); //FIXME:  race condition here?  or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state?  (probably latter)
+              new TaskAttemptContainerLaunchedEvent(attemptID, -1));
+          //FIXME:  race condition here?  or do we have same kind of lock on TA handler => MapTask can't send TA_UPDATE before TA_CONTAINER_LAUNCHED moves TA to RUNNING state?  (probably latter)
 
           if (numMapTasks == 0) {
             doneWithMaps = true;

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java

@@ -63,4 +63,9 @@ public interface TaskAttempt {
    *  yet, returns 0.
    */
   long getFinishTime();
+
+  /**
+   * @return the port shuffle is on.
+   */
+  public int getShufflePort();
 }

+ 45 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerLaunchedEvent.java

@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class TaskAttemptContainerLaunchedEvent extends TaskAttemptEvent {
+  private int shufflePort;
+
+  /**
+   * Create a new TaskAttemptEvent.
+   * @param id the id of the task attempt
+   * @param shufflePort the port that shuffle is listening on.
+   */
+  public TaskAttemptContainerLaunchedEvent(TaskAttemptId id, int shufflePort) {
+    super(id, TaskAttemptEventType.TA_CONTAINER_LAUNCHED);
+    this.shufflePort = shufflePort;
+  }
+
+  
+  /**
+   * Get the port that the shuffle handler is listening on. This is only
+   * valid if the type of the event is TA_CONTAINER_LAUNCHED
+   * @return the port the shuffle handler is listening on.
+   */
+  public int getShufflePort() {
+    return shufflePort;
+  }
+}

+ 6 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java

@@ -28,7 +28,12 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
 
   private TaskAttemptId attemptID;
-
+  
+  /**
+   * Create a new TaskAttemptEvent.
+   * @param id the id of the task attempt
+   * @param type the type of event that happened.
+   */
   public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) {
     super(type);
     this.attemptID = id;
@@ -37,5 +42,4 @@ public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
   public TaskAttemptId getTaskAttemptID() {
     return attemptID;
   }
-
 }

+ 20 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java

@@ -43,7 +43,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceChildJVM;
-import org.apache.hadoop.mapred.ProgressSplitsBlock;
 import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
@@ -65,7 +64,6 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.MRConstants;
 import org.apache.hadoop.mapreduce.v2.api.records.Counter;
-import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
 import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -80,6 +78,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -126,7 +125,6 @@ import org.apache.hadoop.yarn.util.RackResolver;
 /**
  * Implementation of TaskAttempt interface.
  */
-@SuppressWarnings("all")
 public abstract class TaskAttemptImpl implements
     org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
       EventHandler<TaskAttemptEvent> {
@@ -159,6 +157,7 @@ public abstract class TaskAttemptImpl implements
   private long launchTime;
   private long finishTime;
   private WrappedProgressSplitsBlock progressSplitBlock;
+  private int shufflePort = -1;
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
@@ -596,13 +595,10 @@ public abstract class TaskAttemptImpl implements
 
       // Add shuffle token
       LOG.info("Putting shuffle token in serviceData");
-      DataOutputBuffer jobToken_dob = new DataOutputBuffer();
-      jobToken.write(jobToken_dob);
       container
           .setServiceData(
               ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
-              ByteBuffer.wrap(jobToken_dob.getData(), 0,
-                  jobToken_dob.getLength()));
+              ShuffleHandler.serializeServiceData(jobToken));
 
       MRApps.addToClassPath(container.getAllEnv(), getInitialClasspath());
     } catch (IOException e) {
@@ -784,6 +780,17 @@ public abstract class TaskAttemptImpl implements
     }
   }
 
+
+  @Override
+  public int getShufflePort() {
+    readLock.lock();
+    try {
+      return shufflePort;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   /**If container Assigned then return the node's address, otherwise null.
    */
   @Override
@@ -1153,7 +1160,11 @@ public abstract class TaskAttemptImpl implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
-        TaskAttemptEvent event) {
+        TaskAttemptEvent evnt) {
+
+      TaskAttemptContainerLaunchedEvent event =
+        (TaskAttemptContainerLaunchedEvent) evnt;
+
       //set the launch time
       taskAttempt.launchTime = taskAttempt.clock.getTime();
       // register it to TaskAttemptListener so that it start listening
@@ -1186,6 +1197,7 @@ public abstract class TaskAttemptImpl implements
       //make remoteTask reference as null as it is no more needed
       //and free up the memory
       taskAttempt.remoteTask = null;
+      taskAttempt.shufflePort = event.getShufflePort();
       
       //tell the Task that attempt has started
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java

@@ -559,8 +559,9 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
     if (attempt.getNodeHttpAddress() != null) {
       TaskAttemptCompletionEvent tce = recordFactory.newRecordInstance(TaskAttemptCompletionEvent.class);
       tce.setEventId(-1);
-      //TODO: XXXXXX  hardcoded port
-      tce.setMapOutputServerAddress("http://" + attempt.getNodeHttpAddress().split(":")[0] + ":8080");
+      tce.setMapOutputServerAddress("http://"
+          + attempt.getNodeHttpAddress().split(":")[0] + ":"
+          + attempt.getShufflePort());
       tce.setStatus(status);
       tce.setAttemptId(attempt.getID());
       int runTime = 0;

+ 20 - 6
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app.launcher;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,11 +31,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.AMConstants;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -48,6 +50,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -179,6 +182,7 @@ public class ContainerLauncherImpl extends AbstractService implements
       this.event = event;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void run() {
       LOG.info("Processing the event " + event.toString());
@@ -208,15 +212,25 @@ public class ContainerLauncherImpl extends AbstractService implements
           StartContainerRequest startRequest = recordFactory
               .newRecordInstance(StartContainerRequest.class);
           startRequest.setContainerLaunchContext(containerLaunchContext);
-          proxy.startContainer(startRequest);
-
-          LOG.info("Returning from container-launch for " + taskAttemptID);
+          StartContainerResponse response = proxy.startContainer(startRequest);
+          ByteBuffer portInfo = response
+              .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
+          int port = -1;
+          if(portInfo != null) {
+            port = ShuffleHandler.deserializeMetaData(portInfo);
+          }
+          LOG.info("Shuffle port returned by ContainerManager for "
+              + taskAttemptID + " : " + port);
+          
+          if(port < 0) {
+            throw new IllegalStateException("Invalid shuffle port number "
+                + port + " returned for " + taskAttemptID);
+          }
 
           // after launching, send launched event to task attempt to move
           // it from ASSIGNED to RUNNING state
           context.getEventHandler().handle(
-              new TaskAttemptEvent(taskAttemptID,
-                  TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+              new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
         } catch (Throwable t) {
           String message = "Container launch failed for " + containerID
               + " : " + StringUtils.stringifyException(t);

+ 3 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
@@ -295,8 +296,8 @@ public class RecoveryService extends CompositeService implements Recovery {
         TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
             .getTaskAttemptID();
         TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
-        actualHandler.handle(new TaskAttemptEvent(aId,
-            TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+        //TODO need to get the real port number MAPREDUCE-2666
+        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId, -1));
         // send the status update event
         sendStatusUpdateEvent(aId, attInfo);
 

+ 5 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -52,6 +52,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
@@ -291,9 +292,11 @@ public class MRApp extends MRAppMaster {
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
       case CONTAINER_REMOTE_LAUNCH:
+        //We are running locally so set the shuffle port to -1 
         getContext().getEventHandler().handle(
-            new TaskAttemptEvent(event.getTaskAttemptID(),
-                TaskAttemptEventType.TA_CONTAINER_LAUNCHED));
+            new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
+                -1)
+            );
         
         attemptLaunched(event.getTaskAttemptID());
         break;

+ 7 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java

@@ -25,6 +25,8 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobCounter;
@@ -200,6 +202,11 @@ public class MockJobs extends MockApps {
         return 0;
       }
 
+      @Override
+      public int getShufflePort() {
+        return ShuffleHandler.DEFAULT_SHUFFLE_PORT;
+      }
+
       @Override
       public Counters getCounters() {
         return report.getCounters();

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java

@@ -515,6 +515,11 @@ public class TestRuntimeEstimators {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
+    @Override
+    public int getShufflePort() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
     private float getCodeRuntime() {
       int taskIndex = myAttemptID.getTaskId().getId();
       int attemptIndex = myAttemptID.getId();

+ 6 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java

@@ -146,4 +146,10 @@ public class CompletedTaskAttempt implements TaskAttempt {
   public long getFinishTime() {
     return report.getFinishTime();
   }
+
+  @Override
+  public int getShufflePort() {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
 }

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java

@@ -72,6 +72,8 @@ public class MiniMRYarnCluster extends MiniYARNCluster {
     conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT,
         ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
         Service.class);
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
     conf.setClass(NMConfig.NM_CONTAINER_EXECUTOR_CLASS,
         DefaultContainerExecutor.class, ContainerExecutor.class);
 

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java

@@ -105,7 +105,8 @@ public class TestMRJobs {
 
     if (mrCluster == null) {
       mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
-      mrCluster.init(new Configuration());
+      Configuration conf = new Configuration();
+      mrCluster.init(conf);
       mrCluster.start();
     }
 

+ 64 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java

@@ -120,7 +120,8 @@ public class ShuffleHandler extends AbstractService
   private static final JobTokenSecretManager secretManager =
     new JobTokenSecretManager();
 
-  public static final String SHUFFLE_PORT = "mapreduce.shuffle.port";
+  public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
+  public static final int DEFAULT_SHUFFLE_PORT = 8080;
 
   @Metrics(about="Shuffle output metrics", context="mapred")
   static class ShuffleMetrics implements ChannelFutureListener {
@@ -155,15 +156,59 @@ public class ShuffleHandler extends AbstractService
     this(DefaultMetricsSystem.instance());
   }
 
+  /**
+   * Serialize the shuffle port into a ByteBuffer for use later on.
+   * @param port the port to be sent to the ApplciationMaster
+   * @return the serialized form of the port.
+   */
+  static ByteBuffer serializeMetaData(int port) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(port);
+    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+  }
+
+  /**
+   * A helper function to deserialize the metadata returned by ShuffleHandler.
+   * @param meta the metadata returned by the ShuffleHandler
+   * @return the port the Shuffle Handler is listening on to serve shuffle data.
+   */
+  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
+    //TODO this should be returning a class not just an int
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    int port = in.readInt();
+    return port;
+  }
+
+  /**
+   * A helper function to serialize the JobTokenIdentifier to be sent to the
+   * ShuffleHandler as ServiceData.
+   * @param jobToken the job token to be used for authentication of
+   * shuffle data requests.
+   * @return the serialized version of the jobToken.
+   */
+  public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
+    //TODO these bytes should be versioned
+    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+    jobToken.write(jobToken_dob);
+    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+  }
+
+  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(secret);
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+    jt.readFields(in);
+    return jt;
+  }
+
   @Override
   public void initApp(String user, ApplicationId appId, ByteBuffer secret) {
     // TODO these bytes should be versioned
     try {
-      DataInputByteBuffer in = new DataInputByteBuffer();
-      in.reset(secret);
-      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
-      jt.readFields(in);
-      // TODO: Once SHuffle is out of NM, this can use MR APIs
+      Token<JobTokenIdentifier> jt = deserializeServiceData(secret);
+       // TODO: Once SHuffle is out of NM, this can use MR APIs
       JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
       userRsrc.put(jobId.toString(), user);
       LOG.info("Added token for " + jobId.toString());
@@ -193,7 +238,7 @@ public class ShuffleHandler extends AbstractService
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
     bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
-    port = conf.getInt("mapreduce.shuffle.port", 8080);
+    port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     accepted.add(bootstrap.bind(new InetSocketAddress(port)));
     LOG.info(getName() + " listening on port " + port);
     super.start();
@@ -207,6 +252,17 @@ public class ShuffleHandler extends AbstractService
     super.stop();
   }
 
+  @Override
+  public synchronized ByteBuffer getMeta() {
+    try {
+      return serializeMetaData(port); 
+    } catch (IOException e) {
+      LOG.error("Error during getMeta", e);
+      // TODO add API to AuxiliaryServices to report failures
+      return null;
+    }
+  }
+
   Shuffle createShuffle() {
     return new Shuffle(getConfig());
   }
@@ -306,7 +362,7 @@ public class ShuffleHandler extends AbstractService
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
       try {
         verifyRequest(jobId, ctx, request, response,
-            new URL("http", "", 8080, reqUri));
+            new URL("http", "", port, reqUri));
       } catch (IOException e) {
         LOG.warn("Shuffle failure ", e);
         sendError(ctx, e.getMessage(), UNAUTHORIZED);

+ 10 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

@@ -26,11 +26,21 @@ import static org.apache.hadoop.test.MetricsAsserts.*;
 import org.jboss.netty.channel.ChannelFuture;
 
 import org.junit.Test;
+import static org.junit.Assert.*;
 import static org.apache.hadoop.test.MockitoMaker.*;
 
 public class TestShuffleHandler {
   static final long MiB = 1024 * 1024;
 
+  @Test public void testSerializeMeta()  throws Exception {
+    assertEquals(1, ShuffleHandler.deserializeMetaData(
+        ShuffleHandler.serializeMetaData(1)));
+    assertEquals(-1, ShuffleHandler.deserializeMetaData(
+        ShuffleHandler.serializeMetaData(-1)));
+    assertEquals(8080, ShuffleHandler.deserializeMetaData(
+        ShuffleHandler.serializeMetaData(8080)));
+  }
+
   @Test public void testShuffleMetrics() throws Exception {
     MetricsSystem ms = new MetricsSystemImpl();
     ShuffleHandler sh = new ShuffleHandler(ms);

+ 9 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StartContainerResponse.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.api.protocolrecords;
 
+import java.nio.ByteBuffer;
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.ContainerManager;
@@ -32,5 +35,11 @@ import org.apache.hadoop.yarn.api.ContainerManager;
 @Public
 @Stable
 public interface StartContainerResponse {
+  Map<String, ByteBuffer> getAllServiceResponse();
+  ByteBuffer getServiceResponse(String key);
 
+  void addAllServiceResponse(Map<String, ByteBuffer> serviceResponse);
+  void setServiceResponse(String key, ByteBuffer value);
+  void removeServiceResponse(String key);
+  void clearServiceResponse();
 }

+ 108 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StartContainerResponsePBImpl.java

@@ -19,17 +19,26 @@
 package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
 
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.List;
+
+
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProto;
-
-
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
     
 public class StartContainerResponsePBImpl extends ProtoBase<StartContainerResponseProto> implements StartContainerResponse {
   StartContainerResponseProto proto = StartContainerResponseProto.getDefaultInstance();
   StartContainerResponseProto.Builder builder = null;
   boolean viaProto = false;
-  
+ 
+  private Map<String, ByteBuffer> serviceResponse = null;
+
   public StartContainerResponsePBImpl() {
     builder = StartContainerResponseProto.newBuilder();
   }
@@ -40,20 +49,113 @@ public class StartContainerResponsePBImpl extends ProtoBase<StartContainerRespon
   }
   
   public StartContainerResponseProto getProto() {
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
   }
 
+  private void mergeLocalToBuilder() {
+    if (this.serviceResponse != null) {
+      addServiceResponseToProto();
+    }
+  }
+  
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
   private void maybeInitBuilder() {
     if (viaProto || builder == null) {
       builder = StartContainerResponseProto.newBuilder(proto);
     }
     viaProto = false;
   }
-    
-  
-
+   
 
+  @Override
+  public Map<String, ByteBuffer> getAllServiceResponse() {
+    initServiceResponse();
+    return this.serviceResponse;
+  }
+  @Override
+  public ByteBuffer getServiceResponse(String key) {
+    initServiceResponse();
+    return this.serviceResponse.get(key);
+  }
+  
+  private void initServiceResponse() {
+    if (this.serviceResponse != null) {
+      return;
+    }
+    StartContainerResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<StringBytesMapProto> list = p.getServiceResponseList();
+    this.serviceResponse = new HashMap<String, ByteBuffer>();
 
+    for (StringBytesMapProto c : list) {
+      this.serviceResponse.put(c.getKey(), convertFromProtoFormat(c.getValue()));
+    }
+  }
+  
+  @Override
+  public void addAllServiceResponse(final Map<String, ByteBuffer> serviceResponse) {
+    if (serviceResponse == null)
+      return;
+    initServiceResponse();
+    this.serviceResponse.putAll(serviceResponse);
+  }
+  
+  private void addServiceResponseToProto() {
+    maybeInitBuilder();
+    builder.clearServiceResponse();
+    if (serviceResponse == null)
+      return;
+    Iterable<StringBytesMapProto> iterable = new Iterable<StringBytesMapProto>() {
+      
+      @Override
+      public Iterator<StringBytesMapProto> iterator() {
+        return new Iterator<StringBytesMapProto>() {
+          
+          Iterator<String> keyIter = serviceResponse.keySet().iterator();
+          
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+          
+          @Override
+          public StringBytesMapProto next() {
+            String key = keyIter.next();
+            return StringBytesMapProto.newBuilder().setKey(key).setValue(convertToProtoFormat(serviceResponse.get(key))).build();
+          }
+          
+          @Override
+          public boolean hasNext() {
+            return keyIter.hasNext();
+          }
+        };
+      }
+    };
+    builder.addAllServiceResponse(iterable);
+  }
+  @Override
+  public void setServiceResponse(String key, ByteBuffer val) {
+    initServiceResponse();
+    this.serviceResponse.put(key, val);
+  }
+  @Override
+  public void removeServiceResponse(String key) {
+    initServiceResponse();
+    this.serviceResponse.remove(key);
+  }
+  @Override
+  public void clearServiceResponse() {
+    initServiceResponse();
+    this.serviceResponse.clear();
+  }
 }  

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -143,6 +143,7 @@ message StartContainerRequestProto {
 }
 
 message StartContainerResponseProto {
+  repeated StringBytesMapProto service_response = 1;
 }
 
 message StopContainerRequestProto {

+ 38 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java

@@ -44,11 +44,14 @@ public class AuxServices extends AbstractService
   public static final String AUX_SERVICE_CLASS_FMT =
     "nodemanager.aux.service.%s.class";
   public final Map<String,AuxiliaryService> serviceMap;
+  public final Map<String,ByteBuffer> serviceMeta;
 
   public AuxServices() {
     super(AuxServices.class.getName());
     serviceMap =
       Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
+    serviceMeta =
+      Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
     // Obtain services from configuration in init()
   }
 
@@ -63,6 +66,15 @@ public class AuxServices extends AbstractService
     return Collections.unmodifiableCollection(serviceMap.values());
   }
 
+  /**
+   * @return the meta data for all registered services, that have been started.
+   * If a service has not been started no metadata will be available. The key
+   * the the name of the service as defined in the configuration.
+   */
+  public Map<String, ByteBuffer> getMeta() {
+    return Collections.unmodifiableMap(serviceMeta);
+  }
+
   @Override
   public void init(Configuration conf) {
     Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES);
@@ -75,7 +87,15 @@ public class AuxServices extends AbstractService
           throw new RuntimeException("No class defiend for " + sName);
         }
         AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf);
-        // TODO better use use s.getName()?
+        // TODO better use s.getName()?
+        if(!sName.equals(s.getName())) {
+          LOG.warn("The Auxilurary Service named '"+sName+"' in the "
+                  +"configuration is for class "+sClass+" which has "
+                  +"a name of '"+s.getName()+"'. Because these are "
+                  +"not the same tools trying to send ServiceData and read "
+                  +"Service Meta Data may have issues unless the refer to "
+                  +"the name in the config.");
+        }
         addService(sName, s);
         s.init(conf);
       } catch (RuntimeException e) {
@@ -90,9 +110,15 @@ public class AuxServices extends AbstractService
   public void start() {
     // TODO fork(?) services running as configured user
     //      monitor for health, shutdown/restart(?) if any should die
-    for (Service service : serviceMap.values()) {
+    for (Map.Entry<String, AuxiliaryService> entry : serviceMap.entrySet()) {
+      AuxiliaryService service = entry.getValue();
+      String name = entry.getKey();
       service.start();
       service.register(this);
+      ByteBuffer meta = service.getMeta();
+      if(meta != null) {
+        serviceMeta.put(name, meta);
+      }
     }
     super.start();
   }
@@ -108,6 +134,7 @@ public class AuxServices extends AbstractService
           }
         }
         serviceMap.clear();
+        serviceMeta.clear();
       }
     } finally {
       super.stop();
@@ -146,6 +173,15 @@ public class AuxServices extends AbstractService
   public interface AuxiliaryService extends Service {
     void initApp(String user, ApplicationId appId, ByteBuffer data);
     void stopApp(ApplicationId appId);
+    /**
+     * Retreive metadata for this service.  This is likely going to be contact
+     * information so that applications can access the service remotely.  Ideally
+     * each service should provide a method to parse out the information to a usable
+     * class.  This will only be called after the services start method has finished.
+     * the result may be cached.
+     * @return metadata for this service that should be made avaiable to applications.
+     */
+    ByteBuffer getMeta();
   }
 
 }

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -283,6 +283,7 @@ public class ContainerManagerImpl extends CompositeService implements
     dispatcher.getEventHandler().handle(new ApplicationInitEvent(container));
     StartContainerResponse response =
         recordFactory.newRecordInstance(StartContainerResponse.class);
+    response.addAllServiceResponse(auxiluaryServices.getMeta());
     metrics.launchedContainer();
     metrics.allocateContainer(launchContext.getResource());
     return response;

+ 51 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java

@@ -22,6 +22,7 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -44,10 +45,16 @@ public class TestAuxServices {
     private final int expected_appId;
     private int remaining_init;
     private int remaining_stop;
+    private ByteBuffer meta = null;
+
     LightService(String name, char idef, int expected_appId) {
+      this(name, idef, expected_appId, null);
+    } 
+    LightService(String name, char idef, int expected_appId, ByteBuffer meta) {
       super(name);
       this.idef = idef;
       this.expected_appId = expected_appId;
+      this.meta = meta;
     }
     @Override
     public void init(Configuration conf) {
@@ -71,14 +78,18 @@ public class TestAuxServices {
     public void stopApp(ApplicationId appId) {
       assertEquals(expected_appId, appId.getId());
     }
+    @Override
+    public ByteBuffer getMeta() {
+      return meta;
+    }
   }
 
   static class ServiceA extends LightService {
-    public ServiceA() { super("A", 'A', 65); }
+    public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
   }
 
   static class ServiceB extends LightService {
-    public ServiceB() { super("B", 'B', 66); }
+    public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
   }
 
   @Test
@@ -139,6 +150,44 @@ public class TestAuxServices {
     }
   }
 
+
+  @Test
+  public void testAuxServicesMeta() {
+    Configuration conf = new Configuration();
+    conf.setStrings(AuxServices.AUX_SERVICES, new String[] { "Asrv", "Bsrv" });
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Asrv"),
+        ServiceA.class, Service.class);
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT, "Bsrv"),
+        ServiceB.class, Service.class);
+    final AuxServices aux = new AuxServices();
+    aux.init(conf);
+
+    int latch = 1;
+    for (Service s : aux.getServices()) {
+      assertEquals(INITED, s.getServiceState());
+      if (s instanceof ServiceA) { latch *= 2; }
+      else if (s instanceof ServiceB) { latch *= 3; }
+      else fail("Unexpected service type " + s.getClass());
+    }
+    assertEquals("Invalid mix of services", 6, latch);
+    aux.start();
+    for (Service s : aux.getServices()) {
+      assertEquals(STARTED, s.getServiceState());
+    }
+
+    Map<String, ByteBuffer> meta = aux.getMeta();
+    assertEquals(2, meta.size());
+    assertEquals("A", new String(meta.get("Asrv").array()));
+    assertEquals("B", new String(meta.get("Bsrv").array()));
+
+    aux.stop();
+    for (Service s : aux.getServices()) {
+      assertEquals(STOPPED, s.getServiceState());
+    }
+  }
+
+
+
   @Test
   public void testAuxUnexpectedStop() {
     Configuration conf = new Configuration();