ソースを参照

HADOOP-10285. Admin interface to swap callqueue at runtime. (Contributed by Chris Li)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1573052 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 年 前
コミット
d00605f8f0
14 ファイル変更365 行追加1 行削除
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 4 0
      hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
  3. 1 0
      hadoop-common-project/hadoop-common/pom.xml
  4. 48 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshCallQueueProtocol.java
  5. 74 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolClientSideTranslatorPB.java
  6. 37 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolPB.java
  7. 55 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolServerSideTranslatorPB.java
  8. 52 0
      hadoop-common-project/hadoop-common/src/main/proto/RefreshCallQueueProtocol.proto
  9. 15 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
  10. 3 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
  11. 24 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  12. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
  13. 34 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  14. 13 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -314,6 +314,9 @@ Release 2.5.0 - UNRELEASED
     HADOOP-10278. Refactor to make CallQueue pluggable. (Chris Li via
     Arpit Agarwal)
 
+    HADOOP-10285. Admin interface to swap callqueue at runtime. (Chris Li via
+    Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES 

+ 4 - 0
hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml

@@ -301,6 +301,10 @@
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.TestProtos.*"/>
     </Match>
+    <Match>
+      <!-- protobuf generated code -->
+      <Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
+    </Match>
 
     <!--
        Manually checked, misses child thread manually syncing on parent's intrinsic lock.

+ 1 - 0
hadoop-common-project/hadoop-common/pom.xml

@@ -337,6 +337,7 @@
                   <include>GetUserMappingsProtocol.proto</include>
                   <include>RefreshAuthorizationPolicyProtocol.proto</include>
                   <include>RefreshUserMappingsProtocol.proto</include>
+                  <include>RefreshCallQueueProtocol.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

+ 48 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshCallQueueProtocol.java

@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol which is used to refresh the call queue in use currently.
+ */
+@KerberosInfo(
+    serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Evolving
+public interface RefreshCallQueueProtocol {
+  
+  /**
+   * Version 1: Initial version
+   */
+  public static final long versionID = 1L;
+
+  /**
+   * Refresh the callqueue.
+   * @throws IOException
+   */
+  @Idempotent
+  void refreshCallQueue() throws IOException;
+}

+ 74 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolClientSideTranslatorPB.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueRequestProto;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class RefreshCallQueueProtocolClientSideTranslatorPB implements
+    ProtocolMetaInterface, RefreshCallQueueProtocol, Closeable {
+
+  /** RpcController is not used and hence is set to null */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final RefreshCallQueueProtocolPB rpcProxy;
+  
+  private final static RefreshCallQueueRequestProto
+  VOID_REFRESH_CALL_QUEUE_REQUEST =
+      RefreshCallQueueRequestProto.newBuilder().build();
+
+  public RefreshCallQueueProtocolClientSideTranslatorPB(
+      RefreshCallQueueProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public void refreshCallQueue() throws IOException {
+    try {
+      rpcProxy.refreshCallQueue(NULL_CONTROLLER,
+          VOID_REFRESH_CALL_QUEUE_REQUEST);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  @Override
+  public boolean isMethodSupported(String methodName) throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy,
+        RefreshCallQueueProtocolPB.class,
+        RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+        RPC.getProtocolVersion(RefreshCallQueueProtocolPB.class),
+        methodName);
+  }
+}

+ 37 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolPB.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+
+@KerberosInfo(
+    serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@ProtocolInfo(
+    protocolName = "org.apache.hadoop.ipc.RefreshCallQueueProtocol", 
+    protocolVersion = 1)
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Evolving
+public interface RefreshCallQueueProtocolPB extends
+  RefreshCallQueueProtocolService.BlockingInterface {
+}

+ 55 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolServerSideTranslatorPB.java

@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueRequestProto;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueResponseProto;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class RefreshCallQueueProtocolServerSideTranslatorPB implements
+    RefreshCallQueueProtocolPB {
+
+  private final RefreshCallQueueProtocol impl;
+
+  private final static RefreshCallQueueResponseProto
+  VOID_REFRESH_CALL_QUEUE_RESPONSE = RefreshCallQueueResponseProto
+      .newBuilder().build();
+
+  public RefreshCallQueueProtocolServerSideTranslatorPB(
+      RefreshCallQueueProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public RefreshCallQueueResponseProto refreshCallQueue(
+      RpcController controller, RefreshCallQueueRequestProto request)
+      throws ServiceException {
+    try {
+      impl.refreshCallQueue();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    return VOID_REFRESH_CALL_QUEUE_RESPONSE;
+  }
+}

+ 52 - 0
hadoop-common-project/hadoop-common/src/main/proto/RefreshCallQueueProtocol.proto

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.ipc.proto";
+option java_outer_classname = "RefreshCallQueueProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.common;
+
+/**
+ *  Refresh callqueue request.
+ */
+message RefreshCallQueueRequestProto {
+}
+
+/**
+ * void response.
+ */
+message RefreshCallQueueResponseProto {
+}
+
+/**
+ * Protocol which is used to refresh the callqueue.
+ */
+service RefreshCallQueueProtocolService {
+  /**
+   * Refresh the callqueue.
+   */
+  rpc refreshCallQueue(RefreshCallQueueRequestProto)
+      returns(RefreshCallQueueResponseProto);
+}

+ 15 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java

@@ -75,6 +75,9 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolC
 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
@@ -252,13 +255,16 @@ public class NameNodeProxies {
     } else if (xface == RefreshAuthorizationPolicyProtocol.class) {
       proxy = (T) createNNProxyWithRefreshAuthorizationPolicyProtocol(nnAddr,
           conf, ugi);
+    } else if (xface == RefreshCallQueueProtocol.class) {
+      proxy = (T) createNNProxyWithRefreshCallQueueProtocol(nnAddr, conf, ugi);
     } else {
-      String message = "Upsupported protocol found when creating the proxy " +
+      String message = "Unsupported protocol found when creating the proxy " +
           "connection to NameNode: " +
           ((xface != null) ? xface.getClass().getName() : "null");
       LOG.error(message);
       throw new IllegalStateException(message);
     }
+
     return new ProxyAndInfo<T>(proxy, dtService);
   }
   
@@ -286,6 +292,14 @@ public class NameNodeProxies {
     return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
   }
 
+  private static RefreshCallQueueProtocol
+      createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
+          Configuration conf, UserGroupInformation ugi) throws IOException {
+    RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)
+        createNameNodeProxy(address, conf, ugi, RefreshCallQueueProtocolPB.class, 0);
+    return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
+  }
+
   private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
       throws IOException {

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -56,6 +56,7 @@ import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.util.ExitUtil.ExitException;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -224,6 +225,8 @@ public class NameNode implements NameNodeStatusMXBean {
       return RefreshAuthorizationPolicyProtocol.versionID;
     } else if (protocol.equals(RefreshUserMappingsProtocol.class.getName())){
       return RefreshUserMappingsProtocol.versionID;
+    } else if (protocol.equals(RefreshCallQueueProtocol.class.getName())) {
+      return RefreshCallQueueProtocol.versionID;
     } else if (protocol.equals(GetUserMappingsProtocol.class.getName())){
       return GetUserMappingsProtocol.versionID;
     } else {

+ 24 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -138,6 +138,9 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolP
 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@@ -215,6 +218,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
     BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
         .newReflectiveBlockingService(refreshUserMappingXlator);
 
+    RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = 
+        new RefreshCallQueueProtocolServerSideTranslatorPB(this);
+    BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
+        .newReflectiveBlockingService(refreshCallQueueXlator);
+
     GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
         new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService = GetUserMappingsProtocolService
@@ -261,6 +269,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
           refreshAuthService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, 
           refreshUserMappingService, serviceRpcServer);
+      // We support Refreshing call queue here in case the client RPC queue is full
+      DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+          refreshCallQueueService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
@@ -303,6 +314,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
         refreshAuthService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, 
         refreshUserMappingService, clientRpcServer);
+    DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+        refreshCallQueueService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
         getUserMappingService, clientRpcServer);
 
@@ -1095,6 +1108,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
     ProxyUsers.refreshSuperUserGroupsConfiguration();
   }
+
+  @Override // RefreshCallQueueProtocol
+  public void refreshCallQueue() {
+    LOG.info("Refreshing call queue.");
+
+    Configuration conf = new Configuration();
+    clientRpcServer.refreshCallQueue(conf);
+    if (this.serviceRpcServer != null) {
+      serviceRpcServer.refreshCallQueue(conf);
+    }
+  }
   
   @Override // GetUserMappingsProtocol
   public String[] getGroupsForUser(String user) throws IOException {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 /** The full set of RPC methods implemented by the Namenode.  */
@@ -33,6 +34,7 @@ public interface NamenodeProtocols
           NamenodeProtocol,
           RefreshAuthorizationPolicyProtocol,
           RefreshUserMappingsProtocol,
+          RefreshCallQueueProtocol,
           GetUserMappingsProtocol,
           HAServiceProtocol {
 }

+ 34 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -580,6 +581,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshServiceAcl]\n" +
       "\t[-refreshUserToGroupsMappings]\n" +
       "\t[refreshSuperUserGroupsConfiguration]\n" +
+      "\t[-refreshCallQueue]\n" +
       "\t[-printTopology]\n" +
       "\t[-refreshNamenodes datanodehost:port]\n"+
       "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
@@ -649,6 +651,8 @@ public class DFSAdmin extends FsShell {
     String refreshSuperUserGroupsConfiguration = 
       "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
 
+    String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
+
     String printTopology = "-printTopology: Print a tree of the racks and their\n" +
                            "\t\tnodes as reported by the Namenode\n";
     
@@ -717,6 +721,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshUserToGroupsMappings);
     } else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
       System.out.println(refreshSuperUserGroupsConfiguration);
+    } else if ("refreshCallQueue".equals(cmd)) {
+      System.out.println(refreshCallQueue);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("refreshNamenodes".equals(cmd)) {
@@ -750,6 +756,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshServiceAcl);
       System.out.println(refreshUserToGroupsMappings);
       System.out.println(refreshSuperUserGroupsConfiguration);
+      System.out.println(refreshCallQueue);
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
@@ -939,6 +946,27 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  public int refreshCallQueue() throws IOException {
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // for security authorization
+    // server principal for this call   
+    // should be NN's one.
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY, 
+        conf.get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, ""));
+ 
+    // Create the client
+    RefreshCallQueueProtocol refreshProtocol =
+      NameNodeProxies.createProxy(conf, FileSystem.getDefaultUri(conf),
+          RefreshCallQueueProtocol.class).getProxy();
+
+    // Refresh the user-to-groups mappings
+    refreshProtocol.refreshCallQueue();
+    
+    return 0;
+  }
+
   /**
    * Displays format of commands.
    * @param cmd The command that is being executed.
@@ -995,6 +1023,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-refreshSuperUserGroupsConfiguration]");
+    } else if ("-refreshCallQueue".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refreshCallQueue]");
     } else if ("-printTopology".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-printTopology]");
@@ -1026,6 +1057,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-refreshServiceAcl]");
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
+      System.err.println("           [-refreshCallQueue]");
       System.err.println("           [-printTopology]");
       System.err.println("           [-refreshNamenodes datanodehost:port]");
       System.err.println("           [-deleteBlockPool datanode-host:port blockpoolId [force]]");
@@ -1197,6 +1229,8 @@ public class DFSAdmin extends FsShell {
         exitCode = refreshUserToGroupsMappings();
       } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
         exitCode = refreshSuperUserGroupsConfiguration();
+      } else if ("-refreshCallQueue".equals(cmd)) {
+        exitCode = refreshCallQueue();
       } else if ("-printTopology".equals(cmd)) {
         exitCode = printTopology();
       } else if ("-refreshNamenodes".equals(cmd)) {

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestIsMethodSupported.java

@@ -39,6 +39,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolClientSideTranslatorPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
 import org.junit.AfterClass;
@@ -169,4 +171,15 @@ public class TestIsMethodSupported {
     assertTrue(
         translator.isMethodSupported("refreshUserToGroupsMappings"));
   }
+
+  @Test
+  public void testRefreshCallQueueProtocol() throws IOException {
+    RefreshCallQueueProtocolClientSideTranslatorPB translator =
+        (RefreshCallQueueProtocolClientSideTranslatorPB)
+        NameNodeProxies.createNonHAProxy(conf, nnAddress,
+            RefreshCallQueueProtocol.class,
+            UserGroupInformation.getCurrentUser(), true).getProxy();
+    assertTrue(
+        translator.isMethodSupported("refreshCallQueue"));
+  }
 }