Bläddra i källkod

MAPREDUCE-2970. svn merge -c r1173534 --ignore-ancestry ../../trunk/

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1173536 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 år sedan
förälder
incheckning
83d9dc9f8c

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -1352,6 +1352,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3023. Fixed clients to display queue state correctly. (Ravi
     Prakash via acmurthy) 
 
+    MAPREDUCE-2970. Fixed NPEs in corner cases with different configurations
+    for mapreduce.framework.name. (Venu Gopala Rao via vinodkv)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

+ 24 - 13
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java

@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
 
 /**
  * Provides a way to access information about the map/reduce cluster.
@@ -68,30 +68,41 @@ public class Cluster {
   }
   
   public Cluster(Configuration conf) throws IOException {
-    this.conf = conf;
-    this.ugi = UserGroupInformation.getCurrentUser();
-    for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
-      ClientProtocol clientProtocol = provider.create(conf);
-      if (clientProtocol != null) {
-        clientProtocolProvider = provider;
-        client = clientProtocol;
-        break;
-      }
-    }
+    this(null, conf);
   }
 
   public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
       throws IOException {
     this.conf = conf;
     this.ugi = UserGroupInformation.getCurrentUser();
-    for (ClientProtocolProvider provider : ServiceLoader.load(ClientProtocolProvider.class)) {
-      ClientProtocol clientProtocol = provider.create(jobTrackAddr, conf);
+    initialize(jobTrackAddr, conf);
+  }
+  
+  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
+      throws IOException {
+
+    for (ClientProtocolProvider provider : ServiceLoader
+        .load(ClientProtocolProvider.class)) {
+      ClientProtocol clientProtocol = null;
+      if (jobTrackAddr == null) {
+        clientProtocol = provider.create(conf);
+      } else {
+        clientProtocol = provider.create(jobTrackAddr, conf);
+      }
+
       if (clientProtocol != null) {
         clientProtocolProvider = provider;
         client = clientProtocol;
         break;
       }
     }
+
+    if (null == clientProtocolProvider || null == client) {
+      throw new IOException(
+          "Cannot initialize Cluster. Please check your configuration for "
+              + MRConfig.FRAMEWORK_NAME
+              + " and the correspond server addresses.");
+    }
   }
 
   ClientProtocol getClient() {

+ 59 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java

@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.junit.Test;
+
+public class TestYarnClientProtocolProvider extends TestCase {
+
+  @Test
+  public void testClusterWithYarnClientProtocolProvider() throws Exception {
+
+    Configuration conf = new Configuration(false);
+    Cluster cluster = null;
+
+    try {
+      cluster = new Cluster(conf);
+      fail("Cluster should not be initialized with out any framework name");
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf = new Configuration();
+      conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+      cluster = new Cluster(conf);
+      ClientProtocol client = cluster.getClient();
+      assertTrue(client instanceof YARNRunner);
+    } catch (IOException e) {
+
+    } finally {
+      if (cluster != null) {
+        cluster.close();
+      }
+    }
+  }
+}

+ 9 - 5
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/JobTrackerClientProtocolProvider.java

@@ -43,20 +43,24 @@ public class JobTrackerClientProtocolProvider extends ClientProtocolProvider {
     String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
     if (!"local".equals(tracker)) {
       return createRPCProxy(JobTracker.getAddress(conf), conf);
+    } else {
+      throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
+          + "\" configuration value for JobTracker: \""
+          + tracker + "\"");
     }
-    return null;
   }
 
   @Override
-  public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+      throws IOException {
     return createRPCProxy(addr, conf);
   }
-  
+
   private ClientProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
     return (ClientProtocol) RPC.getProxy(ClientProtocol.class,
-      ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
-      conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
+        ClientProtocol.versionID, addr, UserGroupInformation.getCurrentUser(),
+        conf, NetUtils.getSocketFactory(conf, ClientProtocol.class));
   }
 
   @Override

+ 7 - 2
hadoop-mapreduce-project/src/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java

@@ -37,11 +37,16 @@ public class LocalClientProtocolProvider extends ClientProtocolProvider {
     if (framework != null && !framework.equals("local")) {
       return null;
     }
-    if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
+    String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
+    if ("local".equals(tracker)) {
       conf.setInt("mapreduce.job.maps", 1);
       return new LocalJobRunner(conf);
+    } else {
+
+      throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
+          + "\" configuration value for LocalJobRunner : \""
+          + tracker + "\"");
     }
-    return null;
   }
 
   @Override

+ 99 - 0
hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java

@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.LocalJobRunner;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.Test;
+
+public class TestClientProtocolProviderImpls extends TestCase {
+
+  @Test
+  public void testClusterWithLocalClientProvider() throws Exception {
+
+    Configuration conf = new Configuration();
+
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
+      new Cluster(conf);
+      fail("Cluster should not be initialized with incorrect framework name");
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "local");
+      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+
+      new Cluster(conf);
+      fail("Cluster with Local Framework name should use local JT address");
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+      Cluster cluster = new Cluster(conf);
+      assertTrue(cluster.getClient() instanceof LocalJobRunner);
+      cluster.close();
+    } catch (IOException e) {
+
+    }
+  }
+
+  @Test
+  public void testClusterWithJTClientProvider() throws Exception {
+
+    Configuration conf = new Configuration();
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "incorrect");
+      new Cluster(conf);
+      fail("Cluster should not be initialized with incorrect framework name");
+
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf.set(MRConfig.FRAMEWORK_NAME, "classic");
+      conf.set(JTConfig.JT_IPC_ADDRESS, "local");
+      new Cluster(conf);
+      fail("Cluster with classic Framework name shouldnot use local JT address");
+
+    } catch (IOException e) {
+
+    }
+
+    try {
+      conf = new Configuration();
+      conf.set(MRConfig.FRAMEWORK_NAME, "classic");
+      conf.set(JTConfig.JT_IPC_ADDRESS, "127.0.0.1:0");
+      Cluster cluster = new Cluster(conf);
+      cluster.close();
+    } catch (IOException e) {
+
+    }
+  }
+
+}