Forráskód Böngészése

MAPREDUCE-7294. Only application master should upload resource to Yarn Shared Cache (#2223)

Contributed by Zhenzhao Wang <zhenzhaowang@gmail.com>

Signed-off-by: Mingliang Liu <liuml07@apache.org>
zz 4 éve
szülő
commit
0873555f04

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java

@@ -1423,7 +1423,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
    * be set up to false. In that way, the NMs that host the task containers
    * be set up to false. In that way, the NMs that host the task containers
    * won't try to upload the resources to shared cache.
    * won't try to upload the resources to shared cache.
    */
    */
-  private static void cleanupSharedCacheUploadPolicies(Configuration conf) {
+  @VisibleForTesting
+  static void cleanupSharedCacheUploadPolicies(Configuration conf) {
     Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
     Job.setArchiveSharedCacheUploadPolicies(conf, Collections.emptyMap());
     Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
     Job.setFileSharedCacheUploadPolicies(conf, Collections.emptyMap());
   }
   }

+ 23 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

@@ -39,6 +39,7 @@ import java.util.concurrent.CyclicBarrier;
 
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
@@ -991,6 +992,28 @@ public class TestJobImpl {
     Assert.assertEquals(updatedPriority, jobPriority);
     Assert.assertEquals(updatedPriority, jobPriority);
   }
   }
 
 
+  @Test
+  public void testCleanupSharedCacheUploadPolicies() {
+    Configuration config = new Configuration();
+    Map<String, Boolean> archivePolicies = new HashMap<>();
+    archivePolicies.put("archive1", true);
+    archivePolicies.put("archive2", true);
+    Job.setArchiveSharedCacheUploadPolicies(config, archivePolicies);
+    Map<String, Boolean> filePolicies = new HashMap<>();
+    filePolicies.put("file1", true);
+    filePolicies.put("jar1", true);
+    Job.setFileSharedCacheUploadPolicies(config, filePolicies);
+    Assert.assertEquals(
+        2, Job.getArchiveSharedCacheUploadPolicies(config).size());
+    Assert.assertEquals(
+        2, Job.getFileSharedCacheUploadPolicies(config).size());
+    JobImpl.cleanupSharedCacheUploadPolicies(config);
+    Assert.assertEquals(
+        0, Job.getArchiveSharedCacheUploadPolicies(config).size());
+    Assert.assertEquals(
+        0, Job.getFileSharedCacheUploadPolicies(config).size());
+  }
+
   private static CommitterEventHandler createCommitterEventHandler(
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
       Dispatcher dispatcher, OutputCommitter committer) {
     final SystemClock clock = SystemClock.getInstance();
     final SystemClock clock = SystemClock.getInstance();

+ 14 - 19
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java

@@ -1448,26 +1448,21 @@ public class Job extends JobContextImpl implements JobContext, AutoCloseable {
    */
    */
   private static void setSharedCacheUploadPolicies(Configuration conf,
   private static void setSharedCacheUploadPolicies(Configuration conf,
       Map<String, Boolean> policies, boolean areFiles) {
       Map<String, Boolean> policies, boolean areFiles) {
-    if (policies != null) {
-      StringBuilder sb = new StringBuilder();
-      Iterator<Map.Entry<String, Boolean>> it = policies.entrySet().iterator();
-      Map.Entry<String, Boolean> e;
-      if (it.hasNext()) {
-        e = it.next();
-        sb.append(e.getKey() + DELIM + e.getValue());
-      } else {
-        // policies is an empty map, just skip setting the parameter
-        return;
-      }
-      while (it.hasNext()) {
-        e = it.next();
-        sb.append("," + e.getKey() + DELIM + e.getValue());
-      }
-      String confParam =
-          areFiles ? MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES
-              : MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
-      conf.set(confParam, sb.toString());
+    String confParam = areFiles ?
+        MRJobConfig.CACHE_FILES_SHARED_CACHE_UPLOAD_POLICIES :
+        MRJobConfig.CACHE_ARCHIVES_SHARED_CACHE_UPLOAD_POLICIES;
+    // If no policy is provided, we will reset the config by setting an empty
+    // string value. In other words, cleaning up existing policies. This is
+    // useful when we try to clean up shared cache upload policies for
+    // non-application master tasks. See MAPREDUCE-7294 for details.
+    if (policies == null || policies.size() == 0) {
+      conf.set(confParam, "");
+      return;
     }
     }
+    StringBuilder sb = new StringBuilder();
+    policies.forEach((k,v) -> sb.append(k).append(DELIM).append(v).append(","));
+    sb.deleteCharAt(sb.length() - 1);
+    conf.set(confParam, sb.toString());
   }
   }
 
 
   /**
   /**