浏览代码

MAPREDUCE-2707. ProtoOverHadoopRpcEngine without using TunnelProtocol over WritableRpc (Jitendra Pandey via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/MR-279@1152124 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 14 年之前
父节点
当前提交
1056bab105
共有 28 个文件被更改,包括 378 次插入216 次删除
  1. 5 0
      mapreduce/CHANGES.txt
  2. 3 2
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
  3. 58 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java
  4. 1 0
      mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
  5. 6 1
      mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java
  6. 1 1
      mapreduce/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
  7. 5 0
      mapreduce/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
  8. 2 1
      mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
  9. 2 2
      mapreduce/pom.xml
  10. 12 0
      mapreduce/yarn/bin/yarn
  11. 2 2
      mapreduce/yarn/pom.xml
  12. 5 0
      mapreduce/yarn/yarn-common/pom.xml
  13. 3 1
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java
  14. 1 1
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java
  15. 231 151
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java
  16. 0 47
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/TunnelProtocolSecurityInfo.java
  17. 5 1
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java
  18. 5 0
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
  19. 4 1
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/SchedulerSecurityInfo.java
  20. 5 0
      mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientRMSecurityInfo.java
  21. 4 1
      mapreduce/yarn/yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
  22. 4 0
      mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/RMNMSecurityInfoClass.java
  23. 1 0
      mapreduce/yarn/yarn-server/yarn-server-common/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
  24. 2 4
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
  25. 5 0
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java
  26. 1 0
      mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
  27. 4 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java
  28. 1 0
      mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo

+ 5 - 0
mapreduce/CHANGES.txt

@@ -5,6 +5,11 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Fixes for making MR-279 work with trunk common/hdfs. (mahadev)
+
+    MAPREDUCE-2707. ProtoOverHadoopRpcEngine without using TunnelProtocol over 
+    WritableRpc (Jitendra Pandey via mahadev)
+
     MAPREDUCE-2664. Implement JobCounters for MRv2. (Siddharth Seth via 
     sharad)
 

+ 3 - 2
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -433,7 +434,7 @@ public class TaskAttemptListenerImpl extends CompositeService
   @Override
   public ProtocolSignature getProtocolSignature(String protocol,
       long clientVersion, int clientMethodsHash) throws IOException {
-    return ProtocolSignature.getProtocolSigature(this, protocol, clientVersion,
-        clientMethodsHash);
+    return ProtocolSignature.getProtocolSignature(this, 
+        protocol, clientVersion, clientMethodsHash);
   }
 }

+ 58 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRClientSecurityInfo.java

@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.proto.MRClientProtocol;
+import org.apache.hadoop.yarn.security.ApplicationTokenSelector;
+
+public class MRClientSecurityInfo extends SecurityInfo {
+
+  @Override
+  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    return null;
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
+      return null;
+    }
+    return new TokenInfo() {
+
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public Class<? extends TokenSelector<? extends TokenIdentifier>>
+          value() {
+        return ApplicationTokenSelector.class;
+      }
+    };
+  }
+}

+ 1 - 0
mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo

@@ -0,0 +1 @@
+org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo

+ 6 - 1
mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java

@@ -25,11 +25,16 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.yarn.proto.MRClientProtocol;
 
 public class ClientHSSecurityInfo extends SecurityInfo {
-
+  
   @Override
   public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol
+        .equals(MRClientProtocol.MRClientProtocolService.BlockingInterface.class)) {
+      return null;
+    }
     return new KerberosInfo() {
 
       @Override

+ 1 - 1
mapreduce/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java

@@ -440,7 +440,7 @@ class JobSubmitter {
       for(Token<?> token: credentials.getAllTokens()) {
         if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
           LOG.debug("Submitting with " +
-              DFSClient.stringifyToken((Token<org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier>) token));
+              org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier.stringifyToken(token));
         }
       }
     }

+ 5 - 0
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/pom.xml

@@ -16,6 +16,11 @@
   </properties>
 
   <dependencies>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.5.2</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-common</artifactId>

+ 2 - 1
mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -556,7 +557,7 @@ public class YARNRunner implements ClientProtocol {
   @Override
   public ProtocolSignature getProtocolSignature(String protocol,
       long clientVersion, int clientMethodsHash) throws IOException {
-    return ProtocolSignature.getProtocolSigature(this, protocol, clientVersion,
+    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
         clientMethodsHash);
   }
 }

+ 2 - 2
mapreduce/pom.xml

@@ -12,8 +12,8 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <test.logs>true</test.logs>
     <test.timeout>600000</test.timeout>
-    <hadoop-common.version>0.22.0-SNAPSHOT</hadoop-common.version>
-    <hadoop-hdfs.version>0.22.0-SNAPSHOT</hadoop-hdfs.version>
+    <hadoop-common.version>0.23.0-SNAPSHOT</hadoop-common.version>
+    <hadoop-hdfs.version>0.23.0-SNAPSHOT</hadoop-hdfs.version>
     <hadoop-mapreduce.version>1.0-SNAPSHOT</hadoop-mapreduce.version>
     <yarn.version>1.0-SNAPSHOT</yarn.version>
     <install.pom>${project.build.directory}/saner-pom.xml</install.pom>

+ 12 - 0
mapreduce/yarn/bin/yarn

@@ -169,6 +169,18 @@ for f in $HADOOP_COMMON_HOME/lib/*.jar; do
   CLASSPATH=${CLASSPATH}:$f;
 done
 
+for f in $HADOOP_COMMON_HOME/share/hadoop/common/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+for f in $HADOOP_COMMON_HOME/share/hadoop/common/lib/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
+for f in $HADOOP_COMMON_HOME/share/hadoop/hdfs/*.jar; do
+  CLASSPATH=${CLASSPATH}:$f;
+done
+
 if [ -d "$HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Common/common" ]; then
 for f in $HADOOP_COMMON_HOME/build/ivy/lib/Hadoop-Common/common/*.jar; do
   CLASSPATH=${CLASSPATH}:$f;

+ 2 - 2
mapreduce/yarn/pom.xml

@@ -12,8 +12,8 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <test.logs>true</test.logs>
     <test.timeout>600000</test.timeout>
-    <hadoop-common.version>0.22.0-SNAPSHOT</hadoop-common.version>
-    <hadoop-hdfs.version>0.22.0-SNAPSHOT</hadoop-hdfs.version>
+    <hadoop-common.version>0.23.0-SNAPSHOT</hadoop-common.version>
+    <hadoop-hdfs.version>0.23.0-SNAPSHOT</hadoop-hdfs.version>
     <yarn.version>1.0-SNAPSHOT</yarn.version>
     <install.pom>${project.build.directory}/saner-pom.xml</install.pom>
     <install.file>${install.pom}</install.file>

+ 5 - 0
mapreduce/yarn/yarn-common/pom.xml

@@ -16,6 +16,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <version>1.2.12</version>
+    </dependency> 
+   <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>yarn-api</artifactId>
     </dependency>

+ 3 - 1
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/factory/providers/RpcFactoryProvider.java

@@ -3,6 +3,8 @@ package org.apache.hadoop.yarn.factory.providers;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.factories.RpcClientFactory;
@@ -14,7 +16,7 @@ import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
  * A public static get() method must be present in the Client/Server Factory implementation.
  */
 public class RpcFactoryProvider {
-  
+  private static final Log LOG = LogFactory.getLog(RpcFactoryProvider.class);
   //TODO Move these keys to CommonConfigurationKeys
   public static String RPC_SERIALIZER_KEY = "org.apache.yarn.ipc.rpc.serializer.property";
   public static String RPC_SERIALIZER_DEFAULT = "protocolbuffers";

+ 1 - 1
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/HadoopYarnRPC.java

@@ -100,4 +100,4 @@ public class HadoopYarnRPC extends YarnRPC {
 
   }
 
-}
+}

+ 231 - 151
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java

@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.yarn.ipc;
 
 import java.io.Closeable;
@@ -15,15 +33,13 @@ import javax.net.SocketFactory;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtocolProxy;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RpcEngine;
-import org.apache.hadoop.ipc.ProtocolProxy;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.ClientCache;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -35,24 +51,17 @@ import com.google.protobuf.BlockingService;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 
 
 @InterfaceStability.Evolving
 public class ProtoOverHadoopRpcEngine implements RpcEngine {
   private static final Log LOG = LogFactory.getLog(RPC.class);
   
-  private static final RpcEngine ENGINE = new WritableRpcEngine();
-
-  /** Tunnel a Proto RPC request and response through Hadoop's RPC. */
-  public static interface TunnelProtocol extends VersionedProtocol {
-    /** WritableRpcEngine requires a versionID */
-    public static final long versionID = 1L;
-
-    /** All Proto methods and responses go through this. */
-    ProtoSpecificResponseWritable call(ProtoSpecificRequestWritable request) throws IOException;
-  }
-
-
+  private static final ClientCache CLIENTS=new ClientCache();
+  
   @Override
   @SuppressWarnings("unchecked")
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
@@ -74,30 +83,69 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
   }
 
   private class Invoker implements InvocationHandler, Closeable {
-    private TunnelProtocol tunnel;
     private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
+    private boolean isClosed = false;
+    private Client.ConnectionId remoteId;
+    private Client client;
 
     public Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
         int rpcTimeout) throws IOException {
-      this.tunnel = ENGINE.getProxy(TunnelProtocol.class,
-          TunnelProtocol.versionID, addr, ticket, conf, factory, rpcTimeout)
-          .getProxy();
+      this.remoteId = Client.ConnectionId.getConnectionId(addr, protocol,
+          ticket, rpcTimeout, conf);
+      this.client = CLIENTS.getClient(conf, factory,
+          ProtoSpecificResponseWritable.class);
+    }
+
+    private ProtoSpecificRpcRequest constructRpcRequest(Method method,
+        Object[] params) throws ServiceException {
+      ProtoSpecificRpcRequest rpcRequest;
+      ProtoSpecificRpcRequest.Builder builder;
+
+      builder = ProtoSpecificRpcRequest.newBuilder();
+      builder.setMethodName(method.getName());
+
+      if (params.length != 2) { // RpcController + Message
+        throw new ServiceException("Too many parameters for request. Method: ["
+            + method.getName() + "]" + ", Expected: 2, Actual: "
+            + params.length);
+      }
+      if (params[1] == null) {
+        throw new ServiceException("null param while calling Method: ["
+            + method.getName() + "]");
+      }
+
+      Message param = (Message) params[1];
+      builder.setRequestProto(param.toByteString());
+
+      rpcRequest = builder.build();
+      return rpcRequest;
     }
 
     @Override
     public Object invoke(Object proxy, Method method, Object[] args)
         throws Throwable {
+      long startTime = 0;
+      if (LOG.isDebugEnabled()) {
+        startTime = System.currentTimeMillis();
+      }
+
       ProtoSpecificRpcRequest rpcRequest = constructRpcRequest(method, args);
       ProtoSpecificResponseWritable val = null;
       try {
-        val = tunnel.call(new ProtoSpecificRequestWritable(rpcRequest));
+        val = (ProtoSpecificResponseWritable) client.call(
+            new ProtoSpecificRequestWritable(rpcRequest), remoteId);
       } catch (Exception e) {
         throw new ServiceException(e);
       }
       
       ProtoSpecificRpcResponse response = val.message;
-
+   
+      if (LOG.isDebugEnabled()) {
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.debug("Call: " + method.getName() + " " + callTime);
+      }
+ 
       if (response.hasIsError() && response.getIsError() == true) {
         YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl(response.getException());
         exception.fillInStackTrace();
@@ -110,141 +158,63 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
         prototype = getReturnProtoType(method);
       } catch (Exception e) {
         throw new ServiceException(e);
-//        YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl("Could not get prototype PB return type for method: [" + method.getName() + "]", e);
       }
-      Message actualReturnMessage = prototype.newBuilderForType().mergeFrom(response.getResponseProto()).build();
+      Message actualReturnMessage = prototype.newBuilderForType()
+          .mergeFrom(response.getResponseProto()).build();
       return actualReturnMessage;
     }
 
     public void close() throws IOException {
-      ENGINE.stopProxy(tunnel);
+      if (!isClosed) {
+        isClosed = true;
+        CLIENTS.stopClient(client);
+      }
     }
-    
+
     private Message getReturnProtoType(Method method) throws Exception {
       if (returnTypes.containsKey(method.getName())) {
         return returnTypes.get(method.getName());
       } else {
         Class<?> returnType = method.getReturnType();
 
-        Method newInstMethod = returnType.getMethod("getDefaultInstance", null);
+        Method newInstMethod = returnType.getMethod("getDefaultInstance");
         newInstMethod.setAccessible(true);
-        Message prototype = (Message) newInstMethod.invoke(null, null);
+        Message prototype = (Message) newInstMethod.invoke(null,
+            (Object[]) null);
         returnTypes.put(method.getName(), prototype);
         return prototype;
       }
     }
   }
+  
+  /**
+   * Writable Wrapper for Protocol Buffer Requests
+   */
+  private static class ProtoSpecificRequestWritable implements Writable {
+    ProtoSpecificRpcRequest message;
 
-  private class TunnelResponder implements TunnelProtocol {
-    BlockingService service;
-    
-    public TunnelResponder(Class<?> iface, Object impl) {
-      this.service = (BlockingService)impl;
-    }
-
-    public long getProtocolVersion(String protocol, long version)
-        throws IOException {
-      return TunnelProtocol.versionID;
-    }
-    
-    @Override
-    public ProtocolSignature getProtocolSignature(
-        String protocol, long version, int clientMethodsHashCode)
-      throws IOException {
-      return new ProtocolSignature(TunnelProtocol.versionID, null);
-    }
-
-    public ProtoSpecificResponseWritable call(final ProtoSpecificRequestWritable request)
-        throws IOException {
-      ProtoSpecificRpcRequest rpcRequest = request.message;
-      String methodName = rpcRequest.getMethodName();
-      MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
-      
-      Message prototype = service.getRequestPrototype(methodDescriptor);
-      Message param = prototype.newBuilderForType().mergeFrom(rpcRequest.getRequestProto()).build();
-      
-      Message result;
-      try {
-        result = service.callBlockingMethod(methodDescriptor, null, param);
-      } catch (ServiceException e) {
-        return handleException(e);
-      } catch (Exception e) {
-        return handleException(e);
-      }
-      
-      ProtoSpecificRpcResponse response = constructProtoSpecificRpcSuccessResponse(result);
-      return new ProtoSpecificResponseWritable(response);
+    @SuppressWarnings("unused")
+    public ProtoSpecificRequestWritable() {
     }
     
-    private ProtoSpecificResponseWritable handleException (Throwable e) {
-      ProtoSpecificRpcResponse.Builder builder = ProtoSpecificRpcResponse.newBuilder();
-      builder.setIsError(true);
-      if (e.getCause() instanceof YarnRemoteExceptionPBImpl) {
-        builder.setException(((YarnRemoteExceptionPBImpl)e.getCause()).getProto());
-      } else {
-        builder.setException(new YarnRemoteExceptionPBImpl(e).getProto());
-      }
-      ProtoSpecificRpcResponse response = builder.build();
-      return new ProtoSpecificResponseWritable(response);
+    ProtoSpecificRequestWritable(ProtoSpecificRpcRequest message) {
+      this.message = message;
     }
-  }
-
-  @Override
-  public Object[] call(Method method, Object[][] params,
-      InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
-      throws IOException, InterruptedException {
-    throw new UnsupportedOperationException();
-  }
-
-  
-  @Override
-  public RPC.Server getServer(Class<?> protocol, Object instance,
-      String bindAddress, int port, int numHandlers, boolean verbose,
-      Configuration conf, SecretManager<? extends TokenIdentifier> secretManager)
-      throws IOException {
-
-    return ENGINE
-        .getServer(TunnelProtocol.class,
-            new TunnelResponder(protocol, instance), bindAddress, port,
-            numHandlers, verbose, conf, secretManager);
-  }
 
-  
-  private Class<?>[] getRequestParameterTypes(Message[] messages) {
-    Class<?> [] paramTypes = new Class<?>[messages.length];
-    for (int i = 0 ; i < messages.length ; i++) {
-      paramTypes[i] = messages[i].getClass();
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeInt(message.toByteArray().length);
+      out.write(message.toByteArray());
     }
-    return paramTypes;
-  }
-
-  private ProtoSpecificRpcRequest constructRpcRequest(Method method,
-      Object[] params) throws ServiceException {
-    ProtoSpecificRpcRequest rpcRequest;
-    ProtoSpecificRpcRequest.Builder builder;
-
-    builder = ProtoSpecificRpcRequest.newBuilder();
-    builder.setMethodName(method.getName());
 
-    if (params.length != 2) { //RpcController + Message
-      throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + params.length);
-    }
-    if (params[1] == null) {
-      throw new ServiceException("null param while calling Method: [" + method.getName() +"]");
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes);
+      message = ProtoSpecificRpcRequest.parseFrom(bytes);
     }
-
-    Message param = (Message) params[1];
-    builder.setRequestProto(param.toByteString());
-
-    rpcRequest = builder.build();
-    return rpcRequest;
   }
-
-  private ProtoSpecificRpcResponse constructProtoSpecificRpcSuccessResponse(Message message) {
-    ProtoSpecificRpcResponse res = ProtoSpecificRpcResponse.newBuilder().setResponseProto(message.toByteString()).build();
-    return res;
-  }
-
   
   /**
    * Writable Wrapper for Protocol Buffer Responses
@@ -261,7 +231,6 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
 
     @Override
     public void write(DataOutput out) throws IOException {
-//      System.err.println("XXX: writing length: " + message.toByteArray().length);
       out.writeInt(message.toByteArray().length);
       out.write(message.toByteArray());
     }
@@ -269,38 +238,149 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
     @Override
     public void readFields(DataInput in) throws IOException {
       int length = in.readInt();
-//      System.err.println("YYY: Reading length: " + length);
       byte[] bytes = new byte[length];
       in.readFully(bytes);
       message = ProtoSpecificRpcResponse.parseFrom(bytes);
     }
   }
   
-  /**
-   * Writable Wrapper for Protocol Buffer Requests
-   */
-  public static class ProtoSpecificRequestWritable implements Writable {
-    ProtoSpecificRpcRequest message;
+  @Override
+  public Object[] call(Method method, Object[][] params,
+      InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
 
-    public ProtoSpecificRequestWritable() {
+  // for unit testing only
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static Client getClient(Configuration conf) {
+    return CLIENTS.getClient(conf, SocketFactory.getDefault(),
+        ProtoSpecificResponseWritable.class);
+  }
+
+  public static class Server extends RPC.Server {
+
+    private BlockingService service;
+    private boolean verbose;
+//
+//    /**
+//     * Construct an RPC server.
+//     * 
+//     * @param instance
+//     *          the instance whose methods will be called
+//     * @param conf
+//     *          the configuration to use
+//     * @param bindAddress
+//     *          the address to bind on to listen for connection
+//     * @param port
+//     *          the port to listen for connections on
+//     */
+//    public Server(Object instance, Configuration conf, String bindAddress,
+//        int port) throws IOException {
+//      this(instance, conf, bindAddress, port, 1, false, null);
+//    }
+
+    private static String classNameBase(String className) {
+      String[] names = className.split("\\.", -1);
+      if (names == null || names.length == 0) {
+        return className;
+      }
+      return names[names.length - 1];
     }
-    
-    public ProtoSpecificRequestWritable(ProtoSpecificRpcRequest message) {
-      this.message = message;
+
+    /**
+     * Construct an RPC server.
+     * 
+     * @param instance
+     *          the instance whose methods will be called
+     * @param conf
+     *          the configuration to use
+     * @param bindAddress
+     *          the address to bind on to listen for connection
+     * @param port
+     *          the port to listen for connections on
+     * @param numHandlers
+     *          the number of method handler threads to run
+     * @param verbose
+     *          whether each call should be logged
+     */
+    public Server(Object instance, Configuration conf, String bindAddress,
+        int port, int numHandlers, int numReaders, 
+        int queueSizePerHandler, boolean verbose,
+        SecretManager<? extends TokenIdentifier> secretManager)
+        throws IOException {
+      super(bindAddress, port, ProtoSpecificRequestWritable.class, numHandlers,
+          numReaders, queueSizePerHandler, conf, classNameBase(instance.getClass().getName()), secretManager);
+      this.service = (BlockingService) instance;
+      this.verbose = verbose;
     }
 
     @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(message.toByteArray().length);
-      out.write(message.toByteArray());
+    public Writable call(Class<?> protocol, Writable writableRequest,
+        long receiveTime) throws IOException {
+      ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable) writableRequest;
+      ProtoSpecificRpcRequest rpcRequest = request.message;
+      String methodName = rpcRequest.getMethodName();
+      System.out.println("Call: protocol=" + protocol.getCanonicalName() + ", method="
+          + methodName);
+      if (verbose)
+        log("Call: protocol=" + protocol.getCanonicalName() + ", method="
+            + methodName);
+      MethodDescriptor methodDescriptor = service.getDescriptorForType()
+          .findMethodByName(methodName);
+      Message prototype = service.getRequestPrototype(methodDescriptor);
+      Message param = prototype.newBuilderForType()
+          .mergeFrom(rpcRequest.getRequestProto()).build();
+      Message result;
+      try {
+        result = service.callBlockingMethod(methodDescriptor, null, param);
+      } catch (ServiceException e) {
+        e.printStackTrace();
+        return handleException(e);
+      } catch (Exception e) {
+        return handleException(e);
+      }
+
+      ProtoSpecificRpcResponse response = constructProtoSpecificRpcSuccessResponse(result);
+      return new ProtoSpecificResponseWritable(response);
     }
 
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int length = in.readInt();
-      byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      message = ProtoSpecificRpcRequest.parseFrom(bytes);
+    private ProtoSpecificResponseWritable handleException(Throwable e) {
+      ProtoSpecificRpcResponse.Builder builder = ProtoSpecificRpcResponse
+          .newBuilder();
+      builder.setIsError(true);
+      if (e.getCause() instanceof YarnRemoteExceptionPBImpl) {
+        builder.setException(((YarnRemoteExceptionPBImpl) e.getCause())
+            .getProto());
+      } else {
+        builder.setException(new YarnRemoteExceptionPBImpl(e).getProto());
+      }
+      ProtoSpecificRpcResponse response = builder.build();
+      return new ProtoSpecificResponseWritable(response);
     }
+
+    private ProtoSpecificRpcResponse constructProtoSpecificRpcSuccessResponse(
+        Message message) {
+      ProtoSpecificRpcResponse res = ProtoSpecificRpcResponse.newBuilder()
+          .setResponseProto(message.toByteString()).build();
+      return res;
+    }
+  }
+
+  private static void log(String value) {
+    if (value != null && value.length() > 55)
+      value = value.substring(0, 55) + "...";
+    LOG.info(value);
+  }
+
+  @Override
+  public RPC.Server getServer(Class<?> protocol, Object instance,
+      String bindAddress, int port, int numHandlers,int numReaders, 
+      int queueSizePerHandler, boolean verbose,
+      Configuration conf, SecretManager<? extends TokenIdentifier> secretManager)
+      throws IOException {
+    return new Server(instance, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler,
+        verbose, secretManager);
   }
 }

+ 0 - 47
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/TunnelProtocolSecurityInfo.java

@@ -1,47 +0,0 @@
-package org.apache.hadoop.yarn.ipc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.AnnotatedSecurityInfo;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.hadoop.security.SecurityInfo;
-import org.apache.hadoop.security.token.TokenInfo;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.ipc.ProtoOverHadoopRpcEngine.TunnelProtocol;
-
-public class TunnelProtocolSecurityInfo extends SecurityInfo {
-  public static final Log LOG = LogFactory.getLog(TunnelProtocolSecurityInfo.class);
-  
-  @Override
-  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
-    LOG.info("Get kerberos info being called, Tunnelprotocolinfo " + protocol);
-    if (TunnelProtocol.class.equals(protocol)) {
-      try {
-        LOG.info("The Tunnel Security info class " + conf.get(YarnConfiguration.YARN_SECURITY_INFO));
-        Class<SecurityInfo> secInfoClass = (Class<SecurityInfo>)  conf.getClass(
-            YarnConfiguration.YARN_SECURITY_INFO, SecurityInfo.class);
-        SecurityInfo secInfo = secInfoClass.newInstance();
-        return secInfo.getKerberosInfo(protocol, conf);
-      } catch (Exception e) {
-        throw new RuntimeException("Unable to load class", e);
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
-    if (TunnelProtocol.class.equals(protocol)) {
-      try {
-        Class<SecurityInfo> secInfoClass = (Class<SecurityInfo>)  conf.getClass(
-            YarnConfiguration.YARN_SECURITY_INFO, AnnotatedSecurityInfo.class);
-        SecurityInfo secInfo = secInfoClass.newInstance();
-        return secInfo.getTokenInfo(protocol, conf);
-      } catch (Exception e) {
-        throw new RuntimeException("Unable to load Yarn Security Info class", e);
-      }
-    }
-    return null;
-  }
-}

+ 5 - 1
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/YarnRPC.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.ipc;
 import java.net.InetSocketAddress;
 
 import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -30,7 +32,8 @@ import org.apache.hadoop.yarn.YarnException;
  * Abstraction to get the RPC implementation for Yarn.
  */
 public abstract class YarnRPC {
-
+  private static final Log LOG = LogFactory.getLog(YarnRPC.class);
+  
   public static final String RPC_CLASSNAME 
       = "org.apache.hadoop.yarn.ipc.YarnRPC.classname";
 
@@ -47,6 +50,7 @@ public abstract class YarnRPC {
       int numHandlers);
 
   public static YarnRPC create(Configuration conf) {
+    LOG.info("Creating YarnRPC for " + conf.get(RPC_CLASSNAME));
     String clazzName = conf.get(RPC_CLASSNAME);
     if (clazzName == null) {
       clazzName = DEFAULT_RPC;

+ 5 - 0
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.proto.ContainerManager;
 
 public class ContainerManagerSecurityInfo extends SecurityInfo {
 
@@ -36,6 +37,10 @@ public class ContainerManagerSecurityInfo extends SecurityInfo {
 
   @Override
   public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol
+        .equals(ContainerManager.ContainerManagerService.BlockingInterface.class)) {
+      return null;
+    }
     return new TokenInfo() {
 
       @Override

+ 4 - 1
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/SchedulerSecurityInfo.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.proto.AMRMProtocol;
 
 public class SchedulerSecurityInfo extends SecurityInfo {
 
@@ -36,6 +37,9 @@ public class SchedulerSecurityInfo extends SecurityInfo {
 
   @Override
   public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(AMRMProtocol.AMRMProtocolService.BlockingInterface.class)) {
+      return null;
+    }
     return new TokenInfo() {
 
       @Override
@@ -50,5 +54,4 @@ public class SchedulerSecurityInfo extends SecurityInfo {
       }
     };
   }
-
 }

+ 5 - 0
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/client/ClientRMSecurityInfo.java

@@ -25,11 +25,16 @@ import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ClientRMProtocol;
 
 public class ClientRMSecurityInfo extends SecurityInfo {
 
   @Override
   public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol
+        .equals(ClientRMProtocol.ClientRMProtocolService.BlockingInterface.class)) {
+      return null;
+    }
     return new KerberosInfo() {
 
       @Override

+ 4 - 1
mapreduce/yarn/yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo

@@ -1 +1,4 @@
-org.apache.hadoop.yarn.ipc.TunnelProtocolSecurityInfo
+org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo
+org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo
+org.apache.hadoop.yarn.security.SchedulerSecurityInfo
+

+ 4 - 0
mapreduce/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/RMNMSecurityInfoClass.java

@@ -25,11 +25,15 @@ import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.ResourceTracker;
 
 public class RMNMSecurityInfoClass extends SecurityInfo {
 
   @Override
   public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(ResourceTracker.ResourceTrackerService.BlockingInterface.class)) {
+      return null;
+    }
     return new KerberosInfo() {
 
       @Override

+ 1 - 0
mapreduce/yarn/yarn-server/yarn-server-common/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo

@@ -0,0 +1 @@
+org.apache.hadoop.yarn.server.RMNMSecurityInfoClass

+ 2 - 4
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java

@@ -100,8 +100,7 @@ public class ContainerLaunch implements Callable<Integer> {
       String appIdStr = app.toString();
       Path containerLogDir =
           this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
-              + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
-              false);
+              + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf);
       for (String str : command) {
         // TODO: Should we instead work via symlinks without this grammar?
         newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@@ -148,8 +147,7 @@ public class ContainerLaunch implements Callable<Integer> {
               + Path.SEPARATOR + user + Path.SEPARATOR
               + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
               + Path.SEPARATOR + containerIdStr,
-              LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
-
+              LocalDirAllocator.SIZE_UNKNOWN, this.conf);
       try {
         // /////////// Write out the container-script in the nmPrivate space.
         String[] localDirs =

+ 5 - 0
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerSecurityInfo.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.yarn.proto.LocalizationProtocol;
 
 public class LocalizerSecurityInfo extends SecurityInfo {
 
@@ -36,6 +37,10 @@ public class LocalizerSecurityInfo extends SecurityInfo {
 
   @Override
   public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol
+        .equals(LocalizationProtocol.LocalizationProtocolService.BlockingInterface.class)) {
+      return null;
+    }
     return new TokenInfo() {
 
       @Override

+ 1 - 0
mapreduce/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo

@@ -0,0 +1 @@
+org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo

+ 4 - 0
mapreduce/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java → mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/security/admin/AdminSecurityInfo.java

@@ -7,11 +7,15 @@ import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.token.TokenInfo;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.RMAdminProtocol;
 
 public class AdminSecurityInfo extends SecurityInfo {
 
   @Override
   public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(RMAdminProtocol.RMAdminProtocolService.BlockingInterface.class)) {
+      return null;
+    }
     return new KerberosInfo() {
 
       @Override

+ 1 - 0
mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo

@@ -0,0 +1 @@
+org.apache.hadoop.yarn.security.admin.AdminSecurityInfo