Browse Source

HADOOP-10376: Merging r1602055 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1602057 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 11 years ago
parent
commit
8314818b6f
17 changed files with 938 additions and 2 deletions
  1. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  2. 4 0
      hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
  3. 1 0
      hadoop-common-project/hadoop-common/pom.xml
  4. 3 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
  5. 49 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/GenericRefreshProtocol.java
  6. 35 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshHandler.java
  7. 134 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshRegistry.java
  8. 78 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshResponse.java
  9. 119 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolClientSideTranslatorPB.java
  10. 37 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolPB.java
  11. 84 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolServerSideTranslatorPB.java
  12. 61 0
      hadoop-common-project/hadoop-common/src/main/proto/GenericRefreshProtocol.proto
  13. 5 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
  14. 20 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  15. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
  16. 76 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
  17. 227 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -72,6 +72,9 @@ Release 2.5.0 - UNRELEASED
     TCP RST and miss session expiration event due to bug in client connection
     management. (cnauroth)
 
+    HADOOP-10376. Refactor refresh*Protocols into a single generic
+    refreshConfigProtocol. (Chris Li via Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES 

+ 4 - 0
hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml

@@ -305,6 +305,10 @@
       <!-- protobuf generated code -->
       <Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
     </Match>
+    <Match>
+      <!-- protobuf generated code -->
+      <Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
+    </Match>
 
     <!--
        Manually checked, misses child thread manually syncing on parent's intrinsic lock.

+ 1 - 0
hadoop-common-project/hadoop-common/pom.xml

@@ -334,6 +334,7 @@
                   <include>RefreshAuthorizationPolicyProtocol.proto</include>
                   <include>RefreshUserMappingsProtocol.proto</include>
                   <include>RefreshCallQueueProtocol.proto</include>
+                  <include>GenericRefreshProtocol.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

+ 3 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -142,6 +142,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String
   HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE =
       "security.refresh.callqueue.protocol.acl";
+  public static final String
+  HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH =
+      "security.refresh.generic.protocol.acl";
   public static final String 
   SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
   public static final String 

+ 49 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/GenericRefreshProtocol.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol which is used to refresh arbitrary things at runtime.
+ */
+@KerberosInfo(
+    serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface GenericRefreshProtocol {
+  /**
+   * Version 1: Initial version.
+   */
+  public static final long versionID = 1L;
+
+  /**
+   * Refresh the resource based on identity passed in.
+   * @throws IOException
+   */
+  @Idempotent
+  Collection<RefreshResponse> refresh(String identifier, String[] args)
+      throws IOException;
+}

+ 35 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshHandler.java

@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Used to registry custom methods to refresh at runtime.
+ */
+@InterfaceStability.Unstable
+public interface RefreshHandler {
+  /**
+   * Implement this method to accept refresh requests from the administrator.
+   * @param identifier is the identifier you registered earlier
+   * @param args contains a list of string args from the administrator
+   * @throws Exception as a shorthand for a RefreshResponse(-1, message)
+   * @return a RefreshResponse
+   */
+  RefreshResponse handleRefresh(String identifier, String[] args);
+}

+ 134 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshRegistry.java

@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Used to registry custom methods to refresh at runtime.
+ * Each identifier maps to one or more RefreshHandlers.
+ */
+@InterfaceStability.Unstable
+public class RefreshRegistry {
+  public static final Log LOG = LogFactory.getLog(RefreshRegistry.class);
+
+  // Used to hold singleton instance
+  private static class RegistryHolder {
+    @SuppressWarnings("All")
+    public static RefreshRegistry registry = new RefreshRegistry();
+  }
+
+  // Singleton access
+  public static RefreshRegistry defaultRegistry() {
+    return RegistryHolder.registry;
+  }
+
+  private final Multimap<String, RefreshHandler> handlerTable;
+
+  public RefreshRegistry() {
+    handlerTable = HashMultimap.create();
+  }
+
+  /**
+   * Registers an object as a handler for a given identity.
+   * Note: will prevent handler from being GC'd, object should unregister itself
+   *  when done
+   * @param identifier a unique identifier for this resource,
+   *                   such as org.apache.hadoop.blacklist
+   * @param handler the object to register
+   */
+  public synchronized void register(String identifier, RefreshHandler handler) {
+    if (identifier == null) {
+      throw new NullPointerException("Identifier cannot be null");
+    }
+    handlerTable.put(identifier, handler);
+  }
+
+  /**
+   * Remove the registered object for a given identity.
+   * @param identifier the resource to unregister
+   * @return the true if removed
+   */
+  public synchronized boolean unregister(String identifier, RefreshHandler handler) {
+    return handlerTable.remove(identifier, handler);
+  }
+
+  public synchronized void unregisterAll(String identifier) {
+    handlerTable.removeAll(identifier);
+  }
+
+  /**
+   * Lookup the responsible handler and return its result.
+   * This should be called by the RPC server when it gets a refresh request.
+   * @param identifier the resource to refresh
+   * @param args the arguments to pass on, not including the program name
+   * @throws IllegalArgumentException on invalid identifier
+   * @return the response from the appropriate handler
+   */
+  public synchronized Collection<RefreshResponse> dispatch(String identifier, String[] args) {
+    Collection<RefreshHandler> handlers = handlerTable.get(identifier);
+
+    if (handlers.size() == 0) {
+      String msg = "Identifier '" + identifier +
+        "' does not exist in RefreshRegistry. Valid options are: " +
+        Joiner.on(", ").join(handlerTable.keySet());
+
+      throw new IllegalArgumentException(msg);
+    }
+
+    ArrayList<RefreshResponse> responses =
+      new ArrayList<RefreshResponse>(handlers.size());
+
+    // Dispatch to each handler and store response
+    for(RefreshHandler handler : handlers) {
+      RefreshResponse response;
+
+      // Run the handler
+      try {
+        response = handler.handleRefresh(identifier, args);
+        if (response == null) {
+          throw new NullPointerException("Handler returned null.");
+        }
+
+        LOG.info(handlerName(handler) + " responds to '" + identifier +
+          "', says: '" + response.getMessage() + "', returns " +
+          response.getReturnCode());
+      } catch (Exception e) {
+        response = new RefreshResponse(-1, e.getLocalizedMessage());
+      }
+
+      response.setSenderName(handlerName(handler));
+      responses.add(response);
+    }
+
+    return responses;
+  }
+
+  private String handlerName(RefreshHandler h) {
+    return h.getClass().getName() + '@' + Integer.toHexString(h.hashCode());
+  }
+}

+ 78 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RefreshResponse.java

@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Return a response in the handler method for the user to see.
+ * Useful since you may want to display status to a user even though an
+ * error has not occurred.
+ */
+@InterfaceStability.Unstable
+public class RefreshResponse {
+  private int returnCode = -1;
+  private String message;
+  private String senderName;
+
+  /**
+   * Convenience method to create a response for successful refreshes.
+   * @return void response
+   */
+  public static RefreshResponse successResponse() {
+    return new RefreshResponse(0, "Success");
+  }
+
+  // Most RefreshHandlers will use this
+  public RefreshResponse(int returnCode, String message) {
+    this.returnCode = returnCode;
+    this.message = message;
+  }
+
+  /**
+   * Optionally set the sender of this RefreshResponse.
+   * This helps clarify things when multiple handlers respond.
+   * @param name The name of the sender
+   */
+  public void setSenderName(String name) {
+    senderName = name;
+  }
+  public String getSenderName() { return senderName; }
+
+  public int getReturnCode() { return returnCode; }
+  public void setReturnCode(int rc) { returnCode = rc; }
+
+  public void setMessage(String m) { message = m; }
+  public String getMessage() { return message; }
+
+  @Override
+  public String toString() {
+    String ret = "";
+
+    if (senderName != null) {
+      ret += senderName + ": ";
+    }
+
+    if (message != null) {
+      ret += message;
+    }
+
+    ret += " (exit " + returnCode + ")";
+    return ret;
+  }
+}

+ 119 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolClientSideTranslatorPB.java

@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolMetaInterface;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RefreshResponse;
+import org.apache.hadoop.ipc.RpcClientUtil;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class GenericRefreshProtocolClientSideTranslatorPB implements
+    ProtocolMetaInterface, GenericRefreshProtocol, Closeable {
+
+  /** RpcController is not used and hence is set to null. */
+  private final static RpcController NULL_CONTROLLER = null;
+  private final GenericRefreshProtocolPB rpcProxy;
+
+  public GenericRefreshProtocolClientSideTranslatorPB(
+      GenericRefreshProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+  @Override
+  public Collection<RefreshResponse> refresh(String identifier, String[] args) throws IOException {
+    List<String> argList = Arrays.asList(args);
+
+    try {
+      GenericRefreshRequestProto request = GenericRefreshRequestProto.newBuilder()
+        .setIdentifier(identifier)
+        .addAllArgs(argList)
+        .build();
+
+      GenericRefreshResponseCollectionProto resp = rpcProxy.refresh(NULL_CONTROLLER, request);
+      return unpack(resp);
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
+
+  private Collection<RefreshResponse> unpack(GenericRefreshResponseCollectionProto collection) {
+    List<GenericRefreshResponseProto> responseProtos = collection.getResponsesList();
+    List<RefreshResponse> responses = new ArrayList<RefreshResponse>();
+
+    for (GenericRefreshResponseProto rp : responseProtos) {
+      RefreshResponse response = unpack(rp);
+      responses.add(response);
+    }
+
+    return responses;
+  }
+
+  private RefreshResponse unpack(GenericRefreshResponseProto proto) {
+    // The default values
+    String message = null;
+    String sender = null;
+    int returnCode = -1;
+
+    // ... that can be overridden by data from the protobuf
+    if (proto.hasUserMessage()) {
+      message = proto.getUserMessage();
+    }
+    if (proto.hasExitStatus()) {
+      returnCode = proto.getExitStatus();
+    }
+    if (proto.hasSenderName()) {
+      sender = proto.getSenderName();
+    }
+
+    // ... and put into a RefreshResponse
+    RefreshResponse response = new RefreshResponse(returnCode, message);
+    response.setSenderName(sender);
+
+    return response;
+  }
+
+  @Override
+  public boolean isMethodSupported(String methodName) throws IOException {
+    return RpcClientUtil.isMethodSupported(rpcProxy,
+      GenericRefreshProtocolPB.class,
+      RPC.RpcKind.RPC_PROTOCOL_BUFFER,
+      RPC.getProtocolVersion(GenericRefreshProtocolPB.class),
+      methodName);
+  }
+}

+ 37 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolPB.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
+
+@KerberosInfo(
+    serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@ProtocolInfo(
+    protocolName = "org.apache.hadoop.ipc.GenericRefreshProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+@InterfaceStability.Evolving
+public interface GenericRefreshProtocolPB extends
+  GenericRefreshProtocolService.BlockingInterface {
+}

+ 84 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolServerSideTranslatorPB.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc.protocolPB;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
+import org.apache.hadoop.ipc.RefreshResponse;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class GenericRefreshProtocolServerSideTranslatorPB implements
+    GenericRefreshProtocolPB {
+
+  private final GenericRefreshProtocol impl;
+
+  public GenericRefreshProtocolServerSideTranslatorPB(
+      GenericRefreshProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public GenericRefreshResponseCollectionProto refresh(
+      RpcController controller, GenericRefreshRequestProto request)
+      throws ServiceException {
+    try {
+      List<String> argList = request.getArgsList();
+      String[] args = argList.toArray(new String[argList.size()]);
+
+      if (!request.hasIdentifier()) {
+        throw new ServiceException("Request must contain identifier");
+      }
+
+      Collection<RefreshResponse> results = impl.refresh(request.getIdentifier(), args);
+
+      return pack(results);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  // Convert a collection of RefreshResponse objects to a
+  // RefreshResponseCollection proto
+  private GenericRefreshResponseCollectionProto pack(
+    Collection<RefreshResponse> responses) {
+    GenericRefreshResponseCollectionProto.Builder b =
+      GenericRefreshResponseCollectionProto.newBuilder();
+
+    for (RefreshResponse response : responses) {
+      GenericRefreshResponseProto.Builder respBuilder =
+        GenericRefreshResponseProto.newBuilder();
+      respBuilder.setExitStatus(response.getReturnCode());
+      respBuilder.setUserMessage(response.getMessage());
+      respBuilder.setSenderName(response.getSenderName());
+
+      // Add to collection
+      b.addResponses(respBuilder);
+    }
+
+    return b.build();
+  }
+}

+ 61 - 0
hadoop-common-project/hadoop-common/src/main/proto/GenericRefreshProtocol.proto

@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.ipc.proto";
+option java_outer_classname = "GenericRefreshProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.common;
+
+/**
+ *  Refresh request.
+ */
+message GenericRefreshRequestProto {
+    optional string identifier = 1;
+    repeated string args = 2;
+}
+
+/**
+ * A single response from a refresh handler.
+ */
+message GenericRefreshResponseProto {
+    optional int32 exitStatus = 1; // unix exit status to return
+    optional string userMessage = 2; // to be displayed to the user
+    optional string senderName = 3; // which handler sent this message
+}
+
+/**
+ * Collection of responses from zero or more handlers.
+ */
+message GenericRefreshResponseCollectionProto {
+    repeated GenericRefreshResponseProto responses = 1;
+}
+
+/**
+ * Protocol which is used to refresh a user-specified feature.
+ */
+service GenericRefreshProtocolService {
+  rpc refresh(GenericRefreshRequestProto)
+      returns(GenericRefreshResponseCollectionProto);
+}

+ 5 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java

@@ -33,6 +33,7 @@ import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
 
 /**
  * {@link PolicyProvider} for HDFS protocols.
@@ -68,7 +69,10 @@ public class HDFSPolicyProvider extends PolicyProvider {
         GetUserMappingsProtocol.class),
     new Service(
         CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_CALLQUEUE,
-        RefreshCallQueueProtocol.class)
+        RefreshCallQueueProtocol.class),
+    new Service(
+        CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GENERIC_REFRESH,
+        GenericRefreshProtocol.class)
   };
   
   @Override

+ 20 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -132,6 +132,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.RefreshRegistry;
+import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -147,6 +149,9 @@ import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSi
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@@ -229,6 +234,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
     BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
         .newReflectiveBlockingService(refreshCallQueueXlator);
 
+    GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
+        new GenericRefreshProtocolServerSideTranslatorPB(this);
+    BlockingService genericRefreshService = GenericRefreshProtocolService
+        .newReflectiveBlockingService(genericRefreshXlator);
+
     GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
         new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService = GetUserMappingsProtocolService
@@ -277,6 +287,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
       // We support Refreshing call queue here in case the client RPC queue is full
       DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
           refreshCallQueueService, serviceRpcServer);
+      DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+          genericRefreshService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
@@ -317,6 +329,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
         refreshUserMappingService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
         refreshCallQueueService, clientRpcServer);
+    DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+        genericRefreshService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
         getUserMappingService, clientRpcServer);
 
@@ -1150,6 +1164,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
       serviceRpcServer.refreshCallQueue(conf);
     }
   }
+
+  @Override // GenericRefreshProtocol
+  public Collection<RefreshResponse> refresh(String identifier, String[] args) {
+    // Let the registry handle as needed
+    return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
+  }
   
   @Override // GetUserMappingsProtocol
   public String[] getGroupsForUser(String user) throws IOException {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java

@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 /** The full set of RPC methods implemented by the Namenode.  */
@@ -35,6 +36,7 @@ public interface NamenodeProtocols
           RefreshAuthorizationPolicyProtocol,
           RefreshUserMappingsProtocol,
           RefreshCallQueueProtocol,
+          GenericRefreshProtocol,
           GetUserMappingsProtocol,
           HAServiceProtocol {
 }

+ 76 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -26,6 +26,7 @@ import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -62,12 +63,17 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
+import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
-import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -688,6 +694,7 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshUserToGroupsMappings]\n" +
       "\t[-refreshSuperUserGroupsConfiguration]\n" +
       "\t[-refreshCallQueue]\n" +
+      "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
       "\t[-printTopology]\n" +
       "\t[-refreshNamenodes datanodehost:port]\n"+
       "\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
@@ -764,6 +771,10 @@ public class DFSAdmin extends FsShell {
 
     String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
 
+    String genericRefresh = "-refresh: Arguments are <hostname:port> <resource_identifier> [arg1..argn]\n" +
+      "\tTriggers a runtime-refresh of the resource specified by <resource_identifier>\n" +
+      "\ton <hostname:port>. All other args after are sent to the host.";
+
     String printTopology = "-printTopology: Print a tree of the racks and their\n" +
                            "\t\tnodes as reported by the Namenode\n";
     
@@ -848,6 +859,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshSuperUserGroupsConfiguration);
     } else if ("refreshCallQueue".equals(cmd)) {
       System.out.println(refreshCallQueue);
+    } else if ("refresh".equals(cmd)) {
+      System.out.println(genericRefresh);
     } else if ("printTopology".equals(cmd)) {
       System.out.println(printTopology);
     } else if ("refreshNamenodes".equals(cmd)) {
@@ -887,6 +900,7 @@ public class DFSAdmin extends FsShell {
       System.out.println(refreshUserToGroupsMappings);
       System.out.println(refreshSuperUserGroupsConfiguration);
       System.out.println(refreshCallQueue);
+      System.out.println(genericRefresh);
       System.out.println(printTopology);
       System.out.println(refreshNamenodes);
       System.out.println(deleteBlockPool);
@@ -1100,6 +1114,56 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
+  public int genericRefresh(String[] argv, int i) throws IOException {
+    String hostport = argv[i++];
+    String identifier = argv[i++];
+    String[] args = Arrays.copyOfRange(argv, i, argv.length);
+
+    // Get the current configuration
+    Configuration conf = getConf();
+
+    // for security authorization
+    // server principal for this call
+    // should be NN's one.
+    conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
+      conf.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, ""));
+
+    // Create the client
+    Class<?> xface = GenericRefreshProtocolPB.class;
+    InetSocketAddress address = NetUtils.createSocketAddr(hostport);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
+    GenericRefreshProtocolPB proxy = (GenericRefreshProtocolPB)
+      RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
+        ugi, conf, NetUtils.getDefaultSocketFactory(conf), 0);
+
+    GenericRefreshProtocol xlator =
+      new GenericRefreshProtocolClientSideTranslatorPB(proxy);
+
+    // Refresh
+    Collection<RefreshResponse> responses = xlator.refresh(identifier, args);
+
+    int returnCode = 0;
+
+    // Print refresh responses
+    System.out.println("Refresh Responses:\n");
+    for (RefreshResponse response : responses) {
+      System.out.println(response.toString());
+
+      if (returnCode == 0 && response.getReturnCode() != 0) {
+        // This is the first non-zero return code, so we should return this
+        returnCode = response.getReturnCode();
+      } else if (returnCode != 0 && response.getReturnCode() != 0) {
+        // Then now we have multiple non-zero return codes,
+        // so we merge them into -1
+        returnCode = -1;
+      }
+    }
+
+    return returnCode;
+  }
+
   /**
    * Displays format of commands.
    * @param cmd The command that is being executed.
@@ -1162,6 +1226,9 @@ public class DFSAdmin extends FsShell {
     } else if ("-refreshCallQueue".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-refreshCallQueue]");
+    } else if ("-refresh".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-refresh <hostname:port> <resource_identifier> [arg1..argn]");
     } else if ("-printTopology".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-printTopology]");
@@ -1195,6 +1262,7 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
       System.err.println("           [-refreshCallQueue]");
+      System.err.println("           [-refresh]");
       System.err.println("           [-printTopology]");
       System.err.println("           [-refreshNamenodes datanodehost:port]");
       System.err.println("           [-deleteBlockPool datanode-host:port blockpoolId [force]]");
@@ -1292,6 +1360,11 @@ public class DFSAdmin extends FsShell {
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-refresh".equals(cmd)) {
+      if (argv.length < 3) {
+        printUsage(cmd);
+        return exitCode;
+      }
     } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
       if (argv.length != 1) {
         printUsage(cmd);
@@ -1387,6 +1460,8 @@ public class DFSAdmin extends FsShell {
         exitCode = refreshSuperUserGroupsConfiguration();
       } else if ("-refreshCallQueue".equals(cmd)) {
         exitCode = refreshCallQueue();
+      } else if ("-refresh".equals(cmd)) {
+        exitCode = genericRefresh(argv, i);
       } else if ("-printTopology".equals(cmd)) {
         exitCode = printTopology();
       } else if ("-refreshNamenodes".equals(cmd)) {

+ 227 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestGenericRefresh.java

@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.ipc.RefreshHandler;
+
+import org.apache.hadoop.ipc.RefreshRegistry;
+import org.apache.hadoop.ipc.RefreshResponse;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.mockito.Mockito;
+
+/**
+ * Before all tests, a MiniDFSCluster is spun up.
+ * Before each test, mock refresh handlers are created and registered.
+ * After each test, the mock handlers are unregistered.
+ * After all tests, the cluster is spun down.
+ */
+public class TestGenericRefresh {
+  private static MiniDFSCluster cluster;
+  private static Configuration config;
+  private static final int NNPort = 54222;
+
+  private static RefreshHandler firstHandler;
+  private static RefreshHandler secondHandler;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    config = new Configuration();
+    config.set("hadoop.security.authorization", "true");
+
+    FileSystem.setDefaultUri(config, "hdfs://localhost:" + NNPort);
+    cluster = new MiniDFSCluster.Builder(config).nameNodePort(NNPort).build();
+    cluster.waitActive();
+  }
+
+  @AfterClass
+  public static void tearDownBeforeClass() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    // Register Handlers, first one just sends an ok response
+    firstHandler = Mockito.mock(RefreshHandler.class);
+    Mockito.stub(firstHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
+      .toReturn(RefreshResponse.successResponse());
+    RefreshRegistry.defaultRegistry().register("firstHandler", firstHandler);
+
+    // Second handler has conditional response for testing args
+    secondHandler = Mockito.mock(RefreshHandler.class);
+    Mockito.stub(secondHandler.handleRefresh("secondHandler", new String[]{"one", "two"}))
+      .toReturn(new RefreshResponse(3, "three"));
+    Mockito.stub(secondHandler.handleRefresh("secondHandler", new String[]{"one"}))
+      .toReturn(new RefreshResponse(2, "two"));
+    RefreshRegistry.defaultRegistry().register("secondHandler", secondHandler);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    RefreshRegistry.defaultRegistry().unregisterAll("firstHandler");
+    RefreshRegistry.defaultRegistry().unregisterAll("secondHandler");
+  }
+
+  @Test
+  public void testInvalidCommand() throws Exception {
+    DFSAdmin admin = new DFSAdmin(config);
+    String [] args = new String[]{"-refresh", "nn"};
+    int exitCode = admin.run(args);
+    assertEquals("DFSAdmin should fail due to bad args", -1, exitCode);
+  }
+
+  @Test
+  public void testInvalidIdentifier() throws Exception {
+    DFSAdmin admin = new DFSAdmin(config);
+    String [] args = new String[]{"-refresh", "localhost:" + NNPort, "unregisteredIdentity"};
+    int exitCode = admin.run(args);
+    assertEquals("DFSAdmin should fail due to no handler registered", -1, exitCode);
+  }
+
+  @Test
+  public void testValidIdentifier() throws Exception {
+    DFSAdmin admin = new DFSAdmin(config);
+    String[] args = new String[]{"-refresh", "localhost:" + NNPort, "firstHandler"};
+    int exitCode = admin.run(args);
+    assertEquals("DFSAdmin should succeed", 0, exitCode);
+
+    Mockito.verify(firstHandler).handleRefresh("firstHandler", new String[]{});
+    // Second handler was never called
+    Mockito.verify(secondHandler, Mockito.never())
+      .handleRefresh(Mockito.anyString(), Mockito.any(String[].class));
+  }
+
+  @Test
+  public void testVariableArgs() throws Exception {
+    DFSAdmin admin = new DFSAdmin(config);
+    String[] args = new String[]{"-refresh", "localhost:" + NNPort, "secondHandler", "one"};
+    int exitCode = admin.run(args);
+    assertEquals("DFSAdmin should return 2", 2, exitCode);
+
+    exitCode = admin.run(new String[]{"-refresh", "localhost:" + NNPort, "secondHandler", "one", "two"});
+    assertEquals("DFSAdmin should now return 3", 3, exitCode);
+
+    Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one"});
+    Mockito.verify(secondHandler).handleRefresh("secondHandler", new String[]{"one", "two"});
+  }
+
+  @Test
+  public void testUnregistration() throws Exception {
+    RefreshRegistry.defaultRegistry().unregisterAll("firstHandler");
+
+    // And now this should fail
+    DFSAdmin admin = new DFSAdmin(config);
+    String[] args = new String[]{"-refresh", "localhost:" + NNPort, "firstHandler"};
+    int exitCode = admin.run(args);
+    assertEquals("DFSAdmin should return -1", -1, exitCode);
+  }
+
+  @Test
+  public void testUnregistrationReturnValue() {
+    RefreshHandler mockHandler = Mockito.mock(RefreshHandler.class);
+    RefreshRegistry.defaultRegistry().register("test", mockHandler);
+    boolean ret = RefreshRegistry.defaultRegistry().unregister("test", mockHandler);
+    assertTrue(ret);
+  }
+
+  @Test
+  public void testMultipleRegistration() throws Exception {
+    RefreshRegistry.defaultRegistry().register("sharedId", firstHandler);
+    RefreshRegistry.defaultRegistry().register("sharedId", secondHandler);
+
+    // this should trigger both
+    DFSAdmin admin = new DFSAdmin(config);
+    String[] args = new String[]{"-refresh", "localhost:" + NNPort, "sharedId", "one"};
+    int exitCode = admin.run(args);
+    assertEquals(-1, exitCode); // -1 because one of the responses is unregistered
+
+    // verify we called both
+    Mockito.verify(firstHandler).handleRefresh("sharedId", new String[]{"one"});
+    Mockito.verify(secondHandler).handleRefresh("sharedId", new String[]{"one"});
+
+    RefreshRegistry.defaultRegistry().unregisterAll("sharedId");
+  }
+
+  @Test
+  public void testMultipleReturnCodeMerging() throws Exception {
+    // Two handlers which return two non-zero values
+    RefreshHandler handlerOne = Mockito.mock(RefreshHandler.class);
+    Mockito.stub(handlerOne.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
+      .toReturn(new RefreshResponse(23, "Twenty Three"));
+
+    RefreshHandler handlerTwo = Mockito.mock(RefreshHandler.class);
+    Mockito.stub(handlerTwo.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
+      .toReturn(new RefreshResponse(10, "Ten"));
+
+    // Then registered to the same ID
+    RefreshRegistry.defaultRegistry().register("shared", handlerOne);
+    RefreshRegistry.defaultRegistry().register("shared", handlerTwo);
+
+    // We refresh both
+    DFSAdmin admin = new DFSAdmin(config);
+    String[] args = new String[]{"-refresh", "localhost:" + NNPort, "shared"};
+    int exitCode = admin.run(args);
+    assertEquals(-1, exitCode); // We get -1 because of our logic for melding non-zero return codes
+
+    // Verify we called both
+    Mockito.verify(handlerOne).handleRefresh("shared", new String[]{});
+    Mockito.verify(handlerTwo).handleRefresh("shared", new String[]{});
+
+    RefreshRegistry.defaultRegistry().unregisterAll("shared");
+  }
+
+  @Test
+  public void testExceptionResultsInNormalError() throws Exception {
+    // In this test, we ensure that all handlers are called even if we throw an exception in one
+    RefreshHandler exceptionalHandler = Mockito.mock(RefreshHandler.class);
+    Mockito.stub(exceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
+      .toThrow(new RuntimeException("Exceptional Handler Throws Exception"));
+
+    RefreshHandler otherExceptionalHandler = Mockito.mock(RefreshHandler.class);
+    Mockito.stub(otherExceptionalHandler.handleRefresh(Mockito.anyString(), Mockito.any(String[].class)))
+      .toThrow(new RuntimeException("More Exceptions"));
+
+    RefreshRegistry.defaultRegistry().register("exceptional", exceptionalHandler);
+    RefreshRegistry.defaultRegistry().register("exceptional", otherExceptionalHandler);
+
+    DFSAdmin admin = new DFSAdmin(config);
+    String[] args = new String[]{"-refresh", "localhost:" + NNPort, "exceptional"};
+    int exitCode = admin.run(args);
+    assertEquals(-1, exitCode); // Exceptions result in a -1
+
+    Mockito.verify(exceptionalHandler).handleRefresh("exceptional", new String[]{});
+    Mockito.verify(otherExceptionalHandler).handleRefresh("exceptional", new String[]{});
+
+    RefreshRegistry.defaultRegistry().unregisterAll("exceptional");
+  }
+}