فهرست منبع

Merge -c 1195792 from trunk to branch-0.23 to fix MAPREDUCE-3237.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1195793 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 سال پیش
والد
کامیت
e6f85573e7
13فایلهای تغییر یافته به همراه386 افزوده شده و 72 حذف شده
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 0 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
  3. 215 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
  4. 50 56
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
  5. 98 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
  6. 14 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
  7. 0 11
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
  8. 1 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
  9. 2 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java
  10. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
  11. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
  12. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
  13. 0 1
      hadoop-mapreduce-project/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1864,6 +1864,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3192. Fix Javadoc warning in JobClient.java and Cluster.java.
     (jitendra)
 
+    MAPREDUCE-3237. Move LocalJobRunner to hadoop-mapreduce-client-core.
+    (tomwhite via acmurthy) 
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 0 - 0
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java → hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java


+ 215 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java

@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import com.google.common.collect.Maps;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+/**
+ * A helper class for managing the distributed cache for {@link LocalJobRunner}.
+ */
+@SuppressWarnings("deprecation")
+class LocalDistributedCacheManager {
+  public static final Log LOG =
+    LogFactory.getLog(LocalDistributedCacheManager.class);
+  
+  private List<String> localArchives = new ArrayList<String>();
+  private List<String> localFiles = new ArrayList<String>();
+  private List<String> localClasspaths = new ArrayList<String>();
+  
+  private boolean setupCalled = false;
+  
+  /**
+   * Set up the distributed cache by localizing the resources, and updating
+   * the configuration with references to the localized resources.
+   * @param conf
+   * @throws IOException
+   */
+  public void setup(JobConf conf) throws IOException {
+    // Generate YARN local resources objects corresponding to the distributed
+    // cache configuration
+    Map<String, LocalResource> localResources = 
+      new LinkedHashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    
+    // Find which resources are to be put on the local classpath
+    Map<String, Path> classpaths = new HashMap<String, Path>();
+    Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
+    if (archiveClassPaths != null) {
+      for (Path p : archiveClassPaths) {
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        classpaths.put(p.toUri().getPath().toString(), p);
+      }
+    }
+    Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
+    if (fileClassPaths != null) {
+      for (Path p : fileClassPaths) {
+        FileSystem remoteFS = p.getFileSystem(conf);
+        p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+            remoteFS.getWorkingDirectory()));
+        classpaths.put(p.toUri().getPath().toString(), p);
+      }
+    }
+    
+    // Localize the resources
+    LocalDirAllocator localDirAllocator =
+      new LocalDirAllocator(MRConfig.LOCAL_DIR);
+    FileContext localFSFileContext = FileContext.getLocalFSFileContext();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    
+    Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
+    ExecutorService exec = Executors.newCachedThreadPool();
+    for (LocalResource resource : localResources.values()) {
+      Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
+          localDirAllocator, resource, new Random());
+      Future<Path> future = exec.submit(download);
+      resourcesToPaths.put(resource, future);
+    }
+    for (LocalResource resource : localResources.values()) {
+      Path path;
+      try {
+        path = resourcesToPaths.get(resource).get();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      } catch (ExecutionException e) {
+        throw new IOException(e);
+      }
+      String pathString = path.toUri().toString();
+      if (resource.getType() == LocalResourceType.ARCHIVE) {
+        localArchives.add(pathString);
+      } else if (resource.getType() == LocalResourceType.FILE) {
+        localFiles.add(pathString);
+      }
+      Path resourcePath;
+      try {
+        resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
+      } catch (URISyntaxException e) {
+        throw new IOException(e);
+      }
+      LOG.info(String.format("Localized %s as %s", resourcePath, path));
+      String cp = resourcePath.toUri().getPath();
+      if (classpaths.keySet().contains(cp)) {
+        localClasspaths.add(path.toUri().getPath().toString());
+      }
+    }
+    
+    // Update the configuration object with localized data.
+    if (!localArchives.isEmpty()) {
+      conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
+          .arrayToString(localArchives.toArray(new String[localArchives
+              .size()])));
+    }
+    if (!localFiles.isEmpty()) {
+      conf.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
+          .arrayToString(localFiles.toArray(new String[localArchives
+              .size()])));
+    }
+    if (DistributedCache.getSymlink(conf)) {
+      // This is not supported largely because, 
+      // for a Child subprocess, the cwd in LocalJobRunner
+      // is not a fresh slate, but rather the user's working directory.
+      // This is further complicated because the logic in
+      // setupWorkDir only creates symlinks if there's a jarfile
+      // in the configuration.
+      LOG.warn("LocalJobRunner does not support " +
+          "symlinking into current working dir.");
+    }
+    setupCalled = true;
+  }
+
+  /** 
+   * Are the resources that should be added to the classpath? 
+   * Should be called after setup().
+   * 
+   */
+  public boolean hasLocalClasspaths() {
+    if (!setupCalled) {
+      throw new IllegalStateException(
+          "hasLocalClasspaths() should be called after setup()");
+    }
+    return !localClasspaths.isEmpty();
+  }
+  
+  /**
+   * Creates a class loader that includes the designated
+   * files and archives.
+   */
+  public ClassLoader makeClassLoader(final ClassLoader parent)
+      throws MalformedURLException {
+    final URL[] urls = new URL[localClasspaths.size()];
+    for (int i = 0; i < localClasspaths.size(); ++i) {
+      urls[i] = new File(localClasspaths.get(i)).toURI().toURL();
+      LOG.info(urls[i]);
+    }
+    return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
+      @Override
+      public ClassLoader run() {
+        return new URLClassLoader(urls, parent);
+      }
+    });
+  }
+
+  public void close() throws IOException {
+    FileContext localFSFileContext = FileContext.getLocalFSFileContext();
+    for (String archive : localArchives) {
+      localFSFileContext.delete(new Path(archive), true);
+    }
+    for (String file : localFiles) {
+      localFSFileContext.delete(new Path(file), true);
+    }
+  }
+}

+ 50 - 56
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalJobRunner.java → hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -27,8 +26,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,28 +37,23 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
-import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.server.jobtracker.State;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
@@ -67,6 +61,7 @@ import org.apache.hadoop.security.token.Token;
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
+@SuppressWarnings("deprecation")
 public class LocalJobRunner implements ClientProtocol {
   public static final Log LOG =
     LogFactory.getLog(LocalJobRunner.class);
@@ -82,7 +77,7 @@ public class LocalJobRunner implements ClientProtocol {
   private int reduce_tasks = 0;
   final Random rand = new Random();
   
-  private JobTrackerInstrumentation myMetrics = null;
+  private LocalJobRunnerMetrics myMetrics = null;
 
   private static final String jobDir =  "localRunner/";
 
@@ -125,8 +120,7 @@ public class LocalJobRunner implements ClientProtocol {
     private FileSystem localFs;
     boolean killed = false;
     
-    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
-    private TaskDistributedCacheManager taskDistributedCacheManager;
+    private LocalDistributedCacheManager localDistributedCacheManager;
 
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
@@ -150,27 +144,8 @@ public class LocalJobRunner implements ClientProtocol {
 
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
-      this.trackerDistributerdCacheManager =
-          new TrackerDistributedCacheManager(conf, new DefaultTaskController());
-      this.taskDistributedCacheManager = 
-          trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
-      taskDistributedCacheManager.setup(
-          new LocalDirAllocator(MRConfig.LOCAL_DIR), 
-          new File(systemJobDir.toString()),
-          "archive", "archive");
-      
-      if (DistributedCache.getSymlink(conf)) {
-        // This is not supported largely because, 
-        // for a Child subprocess, the cwd in LocalJobRunner
-        // is not a fresh slate, but rather the user's working directory.
-        // This is further complicated because the logic in
-        // setupWorkDir only creates symlinks if there's a jarfile
-        // in the configuration.
-        LOG.warn("LocalJobRunner does not support " +
-        		"symlinking into current working dir.");
-      }
-      // Setup the symlinks for the distributed cache.
-      TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
+      localDistributedCacheManager = new LocalDistributedCacheManager();
+      localDistributedCacheManager.setup(conf);
       
       // Write out configuration file.  Instead of copying it from
       // systemJobFile, we re-write it, since setup(), above, may have
@@ -184,8 +159,8 @@ public class LocalJobRunner implements ClientProtocol {
       this.job = new JobConf(localJobFile);
 
       // Job (the current object) is a Thread, so we wrap its class loader.
-      if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
-        setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
+      if (localDistributedCacheManager.hasLocalClasspaths()) {
+        setContextClassLoader(localDistributedCacheManager.makeClassLoader(
                 getContextClassLoader()));
       }
       
@@ -200,10 +175,6 @@ public class LocalJobRunner implements ClientProtocol {
       this.start();
     }
 
-    JobProfile getProfile() {
-      return profile;
-    }
-
     /**
      * A Runnable instance that handles a map task to be run by an executor.
      */
@@ -239,7 +210,7 @@ public class LocalJobRunner implements ClientProtocol {
             info.getSplitIndex(), 1);
           map.setUser(UserGroupInformation.getCurrentUser().
               getShortUserName());
-          TaskRunner.setupChildMapredLocalDirs(map, localConf);
+          setupChildMapredLocalDirs(map, localConf);
 
           MapOutputFile mapOutput = new MROutputFiles();
           mapOutput.setConf(localConf);
@@ -333,7 +304,6 @@ public class LocalJobRunner implements ClientProtocol {
       return executor;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void run() {
       JobID jobId = profile.getJobID();
@@ -399,7 +369,7 @@ public class LocalJobRunner implements ClientProtocol {
                 getShortUserName());
             JobConf localConf = new JobConf(job);
             localConf.set("mapreduce.jobtracker.address", "local");
-            TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
+            setupChildMapredLocalDirs(reduce, localConf);
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
               if (!this.isInterrupted()) {
@@ -473,8 +443,7 @@ public class LocalJobRunner implements ClientProtocol {
           fs.delete(systemJobFile.getParent(), true);  // delete submit dir
           localFs.delete(localJobFile, true);              // delete local copy
           // Cleanup distributed cache
-          taskDistributedCacheManager.release();
-          trackerDistributerdCacheManager.purgeCache();
+          localDistributedCacheManager.close();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -593,7 +562,7 @@ public class LocalJobRunner implements ClientProtocol {
   public LocalJobRunner(JobConf conf) throws IOException {
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
-    myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
+    myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
   }
 
   // JobSubmissionProtocol methods
@@ -661,14 +630,6 @@ public class LocalJobRunner implements ClientProtocol {
         reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
   }
 
-  /**
-   * @deprecated Use {@link #getJobTrackerStatus()} instead.
-   */
-  @Deprecated
-  public State getJobTrackerState() throws IOException, InterruptedException {
-    return State.RUNNING;
-  }
-  
   public JobTrackerStatus getJobTrackerStatus() {
     return JobTrackerStatus.RUNNING;
   }
@@ -723,7 +684,7 @@ public class LocalJobRunner implements ClientProtocol {
   }
 
   /**
-   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
    */
   public AccessControlList getQueueAdmins(String queueName) throws IOException {
 	  return new AccessControlList(" ");// no queue admins for local job runner
@@ -820,4 +781,37 @@ public class LocalJobRunner implements ClientProtocol {
       throws IOException, InterruptedException {
     throw new UnsupportedOperationException("Not supported");
   }
+  
+  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+    String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
+    String jobId = t.getJobID().toString();
+    String taskId = t.getTaskID().toString();
+    boolean isCleanup = t.isTaskCleanupTask();
+    String user = t.getUser();
+    StringBuffer childMapredLocalDir =
+        new StringBuffer(localDirs[0] + Path.SEPARATOR
+            + getLocalTaskDir(user, jobId, taskId, isCleanup));
+    for (int i = 1; i < localDirs.length; i++) {
+      childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+          + getLocalTaskDir(user, jobId, taskId, isCleanup));
+    }
+    LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
+    conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
+  }
+  
+  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
+  static final String SUBDIR = jobDir;
+  static final String JOBCACHE = "jobcache";
+  
+  static String getLocalTaskDir(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
+      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
+    }
+    return taskDir;
+  }
+  
+  
 }

+ 98 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java

@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+@SuppressWarnings("deprecation")
+class LocalJobRunnerMetrics implements Updater {
+  private final MetricsRecord metricsRecord;
+
+  private int numMapTasksLaunched = 0;
+  private int numMapTasksCompleted = 0;
+  private int numReduceTasksLaunched = 0;
+  private int numReduceTasksCompleted = 0;
+  private int numWaitingMaps = 0;
+  private int numWaitingReduces = 0;
+  
+  public LocalJobRunnerMetrics(JobConf conf) {
+    String sessionId = conf.getSessionId();
+    // Initiate JVM Metrics
+    JvmMetrics.init("JobTracker", sessionId);
+    // Create a record for map-reduce metrics
+    MetricsContext context = MetricsUtil.getContext("mapred");
+    // record name is jobtracker for compatibility 
+    metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+    metricsRecord.setTag("sessionId", sessionId);
+    context.registerUpdater(this);
+  }
+    
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+      metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+      metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+      metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+      metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
+      metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+
+      numMapTasksLaunched = 0;
+      numMapTasksCompleted = 0;
+      numReduceTasksLaunched = 0;
+      numReduceTasksCompleted = 0;
+      numWaitingMaps = 0;
+      numWaitingReduces = 0;
+    }
+    metricsRecord.update();
+  }
+
+  public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksLaunched;
+    decWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksCompleted;
+  }
+
+  public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksLaunched;
+    decWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksCompleted;
+  }
+
+  private synchronized void decWaitingMaps(JobID id, int task) {
+    numWaitingMaps -= task;
+  }
+  
+  private synchronized void decWaitingReduces(JobID id, int task){
+    numWaitingReduces -= task;
+  }
+
+}

+ 14 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider

@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.mapred.LocalClientProtocolProvider

+ 0 - 11
hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestMRWithDistributedCache.java → hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java

@@ -163,17 +163,6 @@ public class TestMRWithDistributedCache extends TestCase {
     testWithConf(c);
   }
 
-  /** Tests using a full MiniMRCluster. */
-  public void testMiniMRJobRunner() throws Exception {
-    MiniMRCluster m = new MiniMRCluster(1, "file:///", 1);
-    try {
-      testWithConf(m.createJobConf());
-    } finally {
-      m.shutdown();
-    }
-
-  }
-
   private Path createTempFile(String filename, String contents)
       throws IOException {
     Path path = new Path(TEST_ROOT_DIR, filename);

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1166,7 +1166,7 @@
 
 <property>
   <name>mapreduce.framework.name</name>
-  <value>yarn</value>
+  <value>local</value>
   <description>The runtime framework for executing MapReduce jobs.
   Can be one of local, classic or yarn.
   </description>

+ 2 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/FSDownload.java → hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/FSDownload.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+package org.apache.hadoop.yarn.util;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -65,7 +65,7 @@ public class FSDownload implements Callable<Path> {
   static final FsPermission PUBLIC_DIR_PERMS = new FsPermission((short) 0755);
   static final FsPermission PRIVATE_DIR_PERMS = new FsPermission((short) 0700);
 
-  FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
+  public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
       LocalDirAllocator dirs, LocalResource resource, Random rand) {
     this.conf = conf;
     this.dirs = dirs;

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestFSDownload.java → hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
+package org.apache.hadoop.yarn.util;
 
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java

@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.secu
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java

@@ -110,6 +110,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProv
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 

+ 0 - 1
hadoop-mapreduce-project/src/java/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider

@@ -12,4 +12,3 @@
 #   limitations under the License.
 #
 org.apache.hadoop.mapred.JobTrackerClientProtocolProvider
-org.apache.hadoop.mapred.LocalClientProtocolProvider