Просмотр исходного кода

AMBARI-20736. Allow Potentially Long Running Restart Commands To Have Their Own Timeout (ncole)

Nate Cole 8 лет назад
Родитель
Сommit
caad3e77c8
23 измененных файлов с 218 добавлено и 30 удалено
  1. 18 5
      ambari-common/src/main/python/resource_management/libraries/functions/decorator.py
  2. 9 3
      ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java
  3. 5 3
      ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
  4. 1 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java
  5. 65 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java
  6. 6 0
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java
  7. 23 2
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java
  8. 4 1
      ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java
  9. 10 1
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py
  10. 2 0
      ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py
  11. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml
  12. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml
  13. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml
  14. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml
  15. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml
  16. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml
  17. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml
  18. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml
  19. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml
  20. 1 1
      ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml
  21. 1 0
      ambari-server/src/main/resources/upgrade-pack.xsd
  22. 63 3
      ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java
  23. 1 1
      ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml

+ 18 - 5
ambari-common/src/main/python/resource_management/libraries/functions/decorator.py

@@ -26,13 +26,15 @@ __all__ = ['retry', 'safe_retry', ]
 from resource_management.core.logger import Logger
 from resource_management.core.logger import Logger
 
 
 
 
-def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception):
+def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, timeout_func=None):
   """
   """
   Retry decorator for improved robustness of functions.
   Retry decorator for improved robustness of functions.
-  :param times: Number of times to attempt to call the function.
+  :param times: Number of times to attempt to call the function.  Optionally specify the timeout_func.
   :param sleep_time: Initial sleep time between attempts
   :param sleep_time: Initial sleep time between attempts
   :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
   :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
   :param err_class: Exception class to handle
   :param err_class: Exception class to handle
+  :param timeout_func: used when the 'times' argument should be computed.  this function should
+         return an integer value that indicates the number of seconds to wait
   :return: Returns the output of the wrapped function.
   :return: Returns the output of the wrapped function.
   """
   """
   def decorator(function):
   def decorator(function):
@@ -42,6 +44,10 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
       _backoff_factor = backoff_factor
       _backoff_factor = backoff_factor
       _err_class = err_class
       _err_class = err_class
 
 
+      if timeout_func is not None:
+        timeout = timeout_func()
+        _times = timeout // sleep_time  # ensure we end up with an integer
+
       while _times > 1:
       while _times > 1:
         _times -= 1
         _times -= 1
         try:
         try:
@@ -49,7 +55,8 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
         except _err_class, err:
         except _err_class, err:
           Logger.info("Will retry %d time(s), caught exception: %s. Sleeping for %d sec(s)" % (_times, str(err), _sleep_time))
           Logger.info("Will retry %d time(s), caught exception: %s. Sleeping for %d sec(s)" % (_times, str(err), _sleep_time))
           time.sleep(_sleep_time)
           time.sleep(_sleep_time)
-        if(_sleep_time * _backoff_factor <= max_sleep_time):
+
+        if _sleep_time * _backoff_factor <= max_sleep_time:
           _sleep_time *= _backoff_factor
           _sleep_time *= _backoff_factor
 
 
       return function(*args, **kwargs)
       return function(*args, **kwargs)
@@ -57,15 +64,17 @@ def retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=E
   return decorator
   return decorator
 
 
 
 
-def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, return_on_fail=None):
+def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_class=Exception, return_on_fail=None, timeout_func=None):
   """
   """
   Retry decorator for improved robustness of functions. Instead of error generation on the last try, will return
   Retry decorator for improved robustness of functions. Instead of error generation on the last try, will return
   return_on_fail value.
   return_on_fail value.
-  :param times: Number of times to attempt to call the function.
+  :param times: Number of times to attempt to call the function.  Optionally specify the timeout_func.
   :param sleep_time: Initial sleep time between attempts
   :param sleep_time: Initial sleep time between attempts
   :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
   :param backoff_factor: After every failed attempt, multiple the previous sleep time by this factor.
   :param err_class: Exception class to handle
   :param err_class: Exception class to handle
   :param return_on_fail value to return on the last try
   :param return_on_fail value to return on the last try
+  :param timeout_func: used when the 'times' argument should be computed.  this function should
+         return an integer value that indicates the number of seconds to wait
   :return: Returns the output of the wrapped function.
   :return: Returns the output of the wrapped function.
   """
   """
   def decorator(function):
   def decorator(function):
@@ -76,6 +85,10 @@ def safe_retry(times=3, sleep_time=1, max_sleep_time=8, backoff_factor=1, err_cl
       _err_class = err_class
       _err_class = err_class
       _return_on_fail = return_on_fail
       _return_on_fail = return_on_fail
 
 
+      if timeout_func is not None:
+        timeout = timeout_func()
+        _times = timeout // sleep_time  # ensure we end up with an integer
+
       while _times > 1:
       while _times > 1:
         _times -= 1
         _times -= 1
         try:
         try:

+ 9 - 3
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelper.java

@@ -443,7 +443,7 @@ public class AmbariCustomCommandExecutionHelper {
       }
       }
 
 
       boolean isInstallCommand = commandName.equals(RoleCommand.INSTALL.toString());
       boolean isInstallCommand = commandName.equals(RoleCommand.INSTALL.toString());
-      String commandTimeout = configs.getDefaultAgentTaskTimeout(isInstallCommand);
+      int commandTimeout = Short.valueOf(configs.getDefaultAgentTaskTimeout(isInstallCommand)).intValue();
 
 
       if (serviceInfo.getSchemaVersion().equals(AmbariMetaInfo.SCHEMA_VERSION_2)) {
       if (serviceInfo.getSchemaVersion().equals(AmbariMetaInfo.SCHEMA_VERSION_2)) {
         // Service check command is not custom command
         // Service check command is not custom command
@@ -453,7 +453,7 @@ public class AmbariCustomCommandExecutionHelper {
           commandParams.put(SCRIPT, script.getScript());
           commandParams.put(SCRIPT, script.getScript());
           commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
           commandParams.put(SCRIPT_TYPE, script.getScriptType().toString());
           if (script.getTimeout() > 0) {
           if (script.getTimeout() > 0) {
-            commandTimeout = String.valueOf(script.getTimeout());
+            commandTimeout = script.getTimeout();
           }
           }
         } else {
         } else {
           String message = String.format("Component %s has not command script " +
           String message = String.format("Component %s has not command script " +
@@ -464,7 +464,13 @@ public class AmbariCustomCommandExecutionHelper {
         // We don't need package/repo information to perform service check
         // We don't need package/repo information to perform service check
       }
       }
 
 
-      commandParams.put(COMMAND_TIMEOUT, commandTimeout);
+      // !!! the action execution context timeout is the final say, but make sure it's at least 60 seconds
+      if (null != actionExecutionContext.getTimeout()) {
+        commandTimeout = actionExecutionContext.getTimeout().intValue();
+        commandTimeout = Math.max(60, commandTimeout);
+      }
+
+      commandParams.put(COMMAND_TIMEOUT, "" + commandTimeout);
       commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       commandParams.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
       commandParams.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
 
 

+ 5 - 3
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java

@@ -1298,6 +1298,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       String serviceName = wrapper.getTasks().get(0).getService();
       String serviceName = wrapper.getTasks().get(0).getService();
       ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
       ServiceInfo serviceInfo = ambariMetaInfo.getService(stackId.getStackName(),
           stackId.getStackVersion(), serviceName);
           stackId.getStackVersion(), serviceName);
+
       params.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       params.put(SERVICE_PACKAGE_FOLDER, serviceInfo.getServicePackageFolder());
       params.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
       params.put(HOOKS_FOLDER, stackInfo.getStackHooksFolder());
     }
     }
@@ -1308,7 +1309,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     // hosts in maintenance mode are excluded from the upgrade
     // hosts in maintenance mode are excluded from the upgrade
     actionContext.setMaintenanceModeHostExcluded(true);
     actionContext.setMaintenanceModeHostExcluded(true);
 
 
-    actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
+    actionContext.setTimeout(wrapper.getMaxTimeout(s_configuration));
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setAutoSkipFailures(context.isComponentFailureAutoSkipped());
     actionContext.setAutoSkipFailures(context.isComponentFailureAutoSkipped());
 
 
@@ -1388,7 +1389,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
 
 
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         function, filters, commandParams);
         function, filters, commandParams);
-    actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
+    actionContext.setTimeout(wrapper.getMaxTimeout(s_configuration));
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setAutoSkipFailures(context.isComponentFailureAutoSkipped());
     actionContext.setAutoSkipFailures(context.isComponentFailureAutoSkipped());
 
 
@@ -1424,6 +1425,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     }
     }
 
 
     s_commandExecutionHelper.get().addExecutionCommandsToStage(actionContext, stage, requestParams);
     s_commandExecutionHelper.get().addExecutionCommandsToStage(actionContext, stage, requestParams);
+
     request.addStages(Collections.singletonList(stage));
     request.addStages(Collections.singletonList(stage));
   }
   }
 
 
@@ -1448,7 +1450,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
     ActionExecutionContext actionContext = new ActionExecutionContext(cluster.getClusterName(),
         "SERVICE_CHECK", filters, commandParams);
         "SERVICE_CHECK", filters, commandParams);
 
 
-    actionContext.setTimeout(Short.valueOf(s_configuration.getDefaultAgentTaskTimeout(false)));
+    actionContext.setTimeout(wrapper.getMaxTimeout(s_configuration));
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setRetryAllowed(allowRetry);
     actionContext.setAutoSkipFailures(context.isServiceCheckFailureAutoSkipped());
     actionContext.setAutoSkipFailures(context.isServiceCheckFailureAutoSkipped());
 
 

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java

@@ -129,6 +129,7 @@ public class Grouping {
       for (TaskBucket bucket : buckets) {
       for (TaskBucket bucket : buckets) {
         // The TaskWrappers take into account if a task is meant to run on all, any, or master.
         // The TaskWrappers take into account if a task is meant to run on all, any, or master.
         // A TaskWrapper may contain multiple tasks, but typically only one, and they all run on the same set of hosts.
         // A TaskWrapper may contain multiple tasks, but typically only one, and they all run on the same set of hosts.
+        // Generate a task wrapper for every task in the bucket
         List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         List<TaskWrapper> preTasks = TaskWrapperBuilder.getTaskList(service, pc.name, hostsType, bucket.tasks, params);
         List<List<TaskWrapper>> organizedTasks = organizeTaskWrappersBySyncRules(preTasks);
         List<List<TaskWrapper>> organizedTasks = organizeTaskWrappersBySyncRules(preTasks);
         for (List<TaskWrapper> tasks : organizedTasks) {
         for (List<TaskWrapper> tasks : organizedTasks) {
@@ -219,7 +220,6 @@ public class Grouping {
         int batchNum = 0;
         int batchNum = 0;
         for (Set<String> hostSubset : hostSets) {
         for (Set<String> hostSubset : hostSets) {
           batchNum++;
           batchNum++;
-          TaskWrapper expandedTW = new TaskWrapper(tw.getService(), tw.getComponent(), hostSubset, tw.getParams(), tw.getTasks());
 
 
           String stageText = getStageText(verb, ctx.getComponentDisplay(service, pc.name), hostSubset, batchNum, numBatchesNeeded);
           String stageText = getStageText(verb, ctx.getComponentDisplay(service, pc.name), hostSubset, batchNum, numBatchesNeeded);
 
 

+ 65 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapper.java

@@ -25,6 +25,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 
 
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Objects;
 import com.google.gson.Gson;
 import com.google.gson.Gson;
 
 
@@ -33,6 +39,8 @@ import com.google.gson.Gson;
  */
  */
 public class StageWrapper {
 public class StageWrapper {
 
 
+  private static final Logger LOG = LoggerFactory.getLogger(StageWrapper.class);
+
   private static Gson gson = new Gson();
   private static Gson gson = new Gson();
   private String text;
   private String text;
   private Type type;
   private Type type;
@@ -163,4 +171,61 @@ public class StageWrapper {
         .add("text",text)
         .add("text",text)
         .omitNullValues().toString();
         .omitNullValues().toString();
   }
   }
+
+  /**
+   * Gets the maximum timeout for any task that this {@code StageWrapper} encapsulates.  TaskWrappers
+   * are homogeneous across the stage, but timeouts are defined in Upgrade Packs
+   * at the task, so each one should be checked individually.
+   *
+   * <p>
+   * WARNING:  This method relies on incorrect assumptions about {@link StageWrapper}s and the {@link TaskWrapper}s
+   * that are contained in them.  Orchestration is currently forcing a StageWrapper to have only one TaskWrapper,
+   * even though they could have many per the code.
+   *
+   * In addition, a TaskWrapper should have a one-to-one reference with the Task it contains.  That will be
+   * fixed in a future release.
+   * </p>
+   *
+   * @param configuration the configuration instance.  StageWrappers are not injectable, so pass
+   *                      this in.
+   * @return the maximum timeout, or the default agent execution timeout if none are found.  Never {@code null}.
+   */
+  public Short getMaxTimeout(Configuration configuration) {
+
+    Set<String> timeoutKeys = new HashSet<>();
+
+    // !!! FIXME a TaskWrapper should have only one task.
+    for (TaskWrapper wrapper : tasks) {
+      timeoutKeys.addAll(wrapper.getTimeoutKeys());
+    }
+
+    Short defaultTimeout = Short.valueOf(configuration.getDefaultAgentTaskTimeout(false));
+
+    if (CollectionUtils.isEmpty(timeoutKeys)) {
+      return defaultTimeout;
+    }
+
+    Short timeout = null;
+
+    for (String key : timeoutKeys) {
+      String configValue = configuration.getProperty(key);
+
+      if (StringUtils.isNotBlank(configValue)) {
+        try {
+          Short configTimeout = Short.valueOf(configValue);
+
+          if (null == timeout || configTimeout > timeout) {
+            timeout = configTimeout;
+          }
+
+        } catch (Exception e) {
+          LOG.warn("Could not parse {}/{} to a timeout value", key, configValue);
+        }
+      } else {
+        LOG.warn("Configuration {} not found to compute timeout", key);
+      }
+    }
+
+    return null == timeout ? defaultTimeout : timeout;
+  }
 }
 }

+ 6 - 0
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Task.java

@@ -41,6 +41,12 @@ public abstract class Task {
   @XmlAttribute(name = "sequential")
   @XmlAttribute(name = "sequential")
   public boolean isSequential = false;
   public boolean isSequential = false;
 
 
+  /**
+   * The config property to check for timeout.
+   */
+  @XmlAttribute(name="timeout-config")
+  public String timeoutConfig = null;
+
   /**
   /**
    * @return the type of the task
    * @return the type of the task
    */
    */

+ 23 - 2
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapper.java

@@ -19,10 +19,13 @@ package org.apache.ambari.server.state.stack.upgrade;
 
 
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Objects;
 
 
 /**
 /**
@@ -34,7 +37,9 @@ public class TaskWrapper {
   private String component;
   private String component;
   private Set<String> hosts; // all the hosts that all the tasks must run
   private Set<String> hosts; // all the hosts that all the tasks must run
   private Map<String, String> params;
   private Map<String, String> params;
+  /* FIXME a TaskWrapper really should be wrapping ONLY ONE task */
   private List<Task> tasks; // all the tasks defined for the hostcomponent
   private List<Task> tasks; // all the tasks defined for the hostcomponent
+  private Set<String> timeoutKeys = new HashSet<>();
 
 
   /**
   /**
    * @param s the service name for the tasks
    * @param s the service name for the tasks
@@ -42,10 +47,11 @@ public class TaskWrapper {
    * @param hosts the set of hosts that the tasks are for
    * @param hosts the set of hosts that the tasks are for
    * @param tasks an array of tasks as a convenience
    * @param tasks an array of tasks as a convenience
    */
    */
-  public TaskWrapper(String s, String c, Set<String> hosts, Task... tasks) {
-    this(s, c, hosts, null, Arrays.asList(tasks));
+  public TaskWrapper(String s, String c, Set<String> hosts, Task task) {
+    this(s, c, hosts, null, task);
   }
   }
 
 
+
   /**
   /**
    * @param s the service name for the tasks
    * @param s the service name for the tasks
    * @param c the component name for the tasks
    * @param c the component name for the tasks
@@ -71,6 +77,13 @@ public class TaskWrapper {
     this.hosts = hosts;
     this.hosts = hosts;
     this.params = (params == null) ? new HashMap<String, String>() : params;
     this.params = (params == null) ? new HashMap<String, String>() : params;
     this.tasks = tasks;
     this.tasks = tasks;
+
+    // !!! FIXME there should only be one task
+    for (Task task : tasks) {
+      if (StringUtils.isNotBlank(task.timeoutConfig)) {
+        timeoutKeys.add(task.timeoutConfig);
+      }
+    }
   }
   }
 
 
   /**
   /**
@@ -133,4 +146,12 @@ public class TaskWrapper {
     return false;
     return false;
   }
   }
 
 
+
+  /**
+   * @return the timeout keys for all the tasks in this wrapper.
+   */
+  public Set<String> getTimeoutKeys() {
+    return timeoutKeys;
+  }
+
 }
 }

+ 4 - 1
ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/TaskWrapperBuilder.java

@@ -40,13 +40,16 @@ public class TaskWrapperBuilder {
   private static Logger LOG = LoggerFactory.getLogger(TaskWrapperBuilder.class);
   private static Logger LOG = LoggerFactory.getLogger(TaskWrapperBuilder.class);
 
 
   /**
   /**
-   * Creates a collection of tasks based on the set of hosts they are allowed to run on
+   * Creates a collection of task wrappers based on the set of hosts they are allowed to run on
    * by analyzing the "hosts" attribute of any ExecuteTask objects.
    * by analyzing the "hosts" attribute of any ExecuteTask objects.
+   *
    * @param service the service name for the tasks
    * @param service the service name for the tasks
    * @param component the component name for the tasks
    * @param component the component name for the tasks
    * @param hostsType the collection of sets along with their status
    * @param hostsType the collection of sets along with their status
    * @param tasks collection of tasks
    * @param tasks collection of tasks
    * @param params additional parameters
    * @param params additional parameters
+   *
+   * @return the task wrappers, one for each task that is passed with {@code tasks}
    */
    */
   public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks, Map<String, String> params) {
   public static List<TaskWrapper> getTaskList(String service, String component, HostsType hostsType, List<Task> tasks, Map<String, String> params) {
     // Ok if Ambari Server is not part of the cluster hosts since this is only used in the calculation of how many batches
     // Ok if Ambari Server is not part of the cluster hosts since this is only used in the calculation of how many batches

+ 10 - 1
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/hdfs_namenode.py

@@ -533,7 +533,16 @@ def is_namenode_bootstrapped(params):
   return marked
   return marked
 
 
 
 
-@retry(times=125, sleep_time=5, backoff_factor=2, err_class=Fail)
+def find_timeout():
+  import params
+
+  if isinstance(params.command_timeout, (int, long)):
+    return params.command_timeout
+
+  return int(params.command_timeout)
+
+
+@retry(sleep_time=5, backoff_factor=2, err_class=Fail, timeout_func=find_timeout)
 def is_this_namenode_active():
 def is_this_namenode_active():
   """
   """
   Gets whether the current NameNode is Active. This function will wait until the NameNode is
   Gets whether the current NameNode is Active. This function will wait until the NameNode is

+ 2 - 0
ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py

@@ -70,6 +70,8 @@ version = default("/commandParams/version", None)
 # are started using different commands.
 # are started using different commands.
 desired_namenode_role = default("/commandParams/desired_namenode_role", None)
 desired_namenode_role = default("/commandParams/desired_namenode_role", None)
 
 
+command_timeout = default("/commandParams/command_timeout", 900)
+
 # get the correct version to use for checking stack features
 # get the correct version to use for checking stack features
 version_for_stack_feature_checks = get_stack_feature_version(config)
 version_for_stack_feature_checks = get_stack_feature_version(config)
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.3.xml

@@ -547,7 +547,7 @@
     <service name="HDFS">
     <service name="HDFS">
       <component name="NAMENODE">
       <component name="NAMENODE">
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task"/>
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.4.xml

@@ -577,7 +577,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
 
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.5.xml

@@ -679,7 +679,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
 
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.3/upgrades/upgrade-2.6.xml

@@ -688,7 +688,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
 
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.4.xml

@@ -524,7 +524,7 @@
     <service name="HDFS">
     <service name="HDFS">
       <component name="NAMENODE">
       <component name="NAMENODE">
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.5.xml

@@ -679,7 +679,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
 
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.4/upgrades/upgrade-2.6.xml

@@ -693,7 +693,7 @@
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade /> <!--  no-op to prevent config changes on downgrade -->
 
 
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.5.xml

@@ -574,7 +574,7 @@
     <service name="HDFS">
     <service name="HDFS">
       <component name="NAMENODE">
       <component name="NAMENODE">
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.5/upgrades/upgrade-2.6.xml

@@ -619,7 +619,7 @@
         </pre-upgrade>
         </pre-upgrade>
         <pre-downgrade />
         <pre-downgrade />
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 1
ambari-server/src/main/resources/stacks/HDP/2.6/upgrades/upgrade-2.6.xml

@@ -610,7 +610,7 @@
         </pre-upgrade>
         </pre-upgrade>
         <pre-downgrade/> <!--  no-op to prevent config changes on downgrade -->
         <pre-downgrade/> <!--  no-op to prevent config changes on downgrade -->
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.nn-restart.timeout"/>
         </upgrade>
         </upgrade>
       </component>
       </component>
 
 

+ 1 - 0
ambari-server/src/main/resources/upgrade-pack.xsd

@@ -276,6 +276,7 @@
       <xs:element name="summary" minOccurs="0" />
       <xs:element name="summary" minOccurs="0" />
     </xs:sequence>
     </xs:sequence>
     <xs:attribute name="sequential" use="optional" type="xs:boolean" />
     <xs:attribute name="sequential" use="optional" type="xs:boolean" />
+    <xs:attribute name="timeout-config" use="optional" type="xs:string" />
   </xs:complexType>
   </xs:complexType>
   
   
   <xs:complexType name="restart-task">
   <xs:complexType name="restart-task">

+ 63 - 3
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/UpgradeResourceProviderTest.java

@@ -42,9 +42,11 @@ import org.apache.ambari.server.H2DatabaseCleaner;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.HostRoleStatus;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.audit.AuditLogger;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.configuration.Configuration;
@@ -160,9 +162,10 @@ public class UpgradeResourceProviderTest {
 
 
     EasyMock.replay(configHelper);
     EasyMock.replay(configHelper);
 
 
+    InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
+
     // create an injector which will inject the mocks
     // create an injector which will inject the mocks
-    injector = Guice.createInjector(Modules.override(
-        new InMemoryDefaultTestModule()).with(new MockModule()));
+    injector = Guice.createInjector(Modules.override(module).with(new MockModule()));
 
 
     H2DatabaseCleaner.resetSequences(injector);
     H2DatabaseCleaner.resetSequences(injector);
     injector.getInstance(GuiceJpaInitializer.class);
     injector.getInstance(GuiceJpaInitializer.class);
@@ -250,9 +253,12 @@ public class UpgradeResourceProviderTest {
     sch = component.addServiceComponentHost("h1");
     sch = component.addServiceComponentHost("h1");
     sch.setVersion("2.1.1.0");
     sch.setVersion("2.1.1.0");
 
 
+    Configuration configuration = injector.getInstance(Configuration.class);
+    configuration.setProperty("upgrade.parameter.zk-server.timeout", "824");
+
     topologyManager = injector.getInstance(TopologyManager.class);
     topologyManager = injector.getInstance(TopologyManager.class);
     StageUtils.setTopologyManager(topologyManager);
     StageUtils.setTopologyManager(topologyManager);
-    StageUtils.setConfiguration(injector.getInstance(Configuration.class));
+    StageUtils.setConfiguration(configuration);
     ActionManager.setTopologyManager(topologyManager);
     ActionManager.setTopologyManager(topologyManager);
     EasyMock.replay(injector.getInstance(AuditLogger.class));
     EasyMock.replay(injector.getInstance(AuditLogger.class));
   }
   }
@@ -1651,6 +1657,60 @@ public class UpgradeResourceProviderTest {
         HostRoleStatus.IN_PROGRESS_STATUSES);
         HostRoleStatus.IN_PROGRESS_STATUSES);
   }
   }
 
 
+  @Test
+  public void testTimeouts() throws Exception {
+    Cluster cluster = clusters.getCluster("c1");
+
+    StackEntity stackEntity = stackDAO.find("HDP", "2.1.1");
+    RepositoryVersionEntity repoVersionEntity = new RepositoryVersionEntity();
+    repoVersionEntity.setDisplayName("My New Version 3");
+    repoVersionEntity.setOperatingSystems("");
+    repoVersionEntity.setStack(stackEntity);
+    repoVersionEntity.setVersion("2.2.2.3");
+    repoVersionDao.create(repoVersionEntity);
+
+    Map<String, Object> requestProps = new HashMap<>();
+    requestProps.put(UpgradeResourceProvider.UPGRADE_CLUSTER_NAME, "c1");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_VERSION, "2.2.2.3");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_PACK, "upgrade_test");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_SKIP_PREREQUISITE_CHECKS, "true");
+    requestProps.put(UpgradeResourceProvider.UPGRADE_DIRECTION, Direction.UPGRADE.name());
+
+    ResourceProvider upgradeResourceProvider = createProvider(amc);
+
+    Request request = PropertyHelper.getCreateRequest(Collections.singleton(requestProps), null);
+    RequestStatus status = upgradeResourceProvider.createResources(request);
+
+
+    Set<Resource> createdResources = status.getAssociatedResources();
+    assertEquals(1, createdResources.size());
+    Resource res = createdResources.iterator().next();
+    Long id = (Long) res.getPropertyValue("Upgrade/request_id");
+    assertNotNull(id);
+    assertEquals(Long.valueOf(1), id);
+
+
+    ActionManager am = injector.getInstance(ActionManager.class);
+
+    List<HostRoleCommand> commands = am.getRequestTasks(id);
+
+    boolean found = false;
+
+    for (HostRoleCommand command : commands) {
+      ExecutionCommandWrapper wrapper = command.getExecutionCommandWrapper();
+
+      if (command.getRole().equals(Role.ZOOKEEPER_SERVER) && command.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND)) {
+        Map<String, String> commandParams = wrapper.getExecutionCommand().getCommandParams();
+        assertTrue(commandParams.containsKey(KeyNames.COMMAND_TIMEOUT));
+        assertEquals("824",commandParams.get(KeyNames.COMMAND_TIMEOUT));
+        found = true;
+      }
+    }
+
+    assertTrue("ZooKeeper timeout override was found", found);
+
+  }
+
   /**
   /**
    *
    *
    */
    */

+ 1 - 1
ambari-server/src/test/resources/stacks/HDP/2.1.1/upgrades/upgrade_test.xml

@@ -146,7 +146,7 @@
         </pre-upgrade>
         </pre-upgrade>
         <pre-downgrade copy-upgrade="true" />
         <pre-downgrade copy-upgrade="true" />
         <upgrade>
         <upgrade>
-          <task xsi:type="restart-task" />
+          <task xsi:type="restart-task" timeout-config="upgrade.parameter.zk-server.timeout"/>
         </upgrade>
         </upgrade>
         <post-upgrade>
         <post-upgrade>
           <task xsi:type="configure" id="hdp_2_1_1_zookeeper_new_config_type" />
           <task xsi:type="configure" id="hdp_2_1_1_zookeeper_new_config_type" />