Explorar o código

AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (#2449)

* AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (mpapirkovskyy)

* AMBARI-24291. Start All Services on 100-nodes cluster timed out after 1 hour. (mpapirkovskyy)
Myroslav Papirkovskyi %!s(int64=7) %!d(string=hai) anos
pai
achega
9719e54fa9

+ 4 - 0
ambari-server/src/main/java/org/apache/ambari/server/AmbariRuntimeException.java

@@ -25,4 +25,8 @@ public class AmbariRuntimeException extends RuntimeException {
   public AmbariRuntimeException(String message, Throwable cause) {
     super(message, cause);
   }
+
+  public AmbariRuntimeException(String message) {
+    super(message);
+  }
 }

+ 3 - 2
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/Stage.java

@@ -224,11 +224,12 @@ public class Stage {
   void loadExecutionCommandWrappers() {
     for (Map.Entry<String, Map<String, HostRoleCommand>> hostRoleCommandEntry : hostRoleCommands.entrySet()) {
       String hostname = hostRoleCommandEntry.getKey();
-      commandsToSend.put(hostname, new ArrayList<>());
+      List<ExecutionCommandWrapper> wrappers = new ArrayList<>();
       Map<String, HostRoleCommand> roleCommandMap = hostRoleCommandEntry.getValue();
       for (Map.Entry<String, HostRoleCommand> roleCommandEntry : roleCommandMap.entrySet()) {
-        commandsToSend.get(hostname).add(roleCommandEntry.getValue().getExecutionCommandWrapper());
+        wrappers.add(roleCommandEntry.getValue().getExecutionCommandWrapper());
       }
+      commandsToSend.put(hostname, wrappers);
     }
   }
 

+ 0 - 1
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -2554,7 +2554,6 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
     }
 
     Map<String, String> hostParams = new TreeMap<>();
-    hostParams.putAll(getRcaParameters());
 
     if (roleCommand.equals(RoleCommand.INSTALL)) {
       List<ServiceOsSpecific.Package> packages =

+ 0 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/hostcomponents/HostComponentsUpdateListener.java

@@ -49,7 +49,6 @@ public class HostComponentsUpdateListener {
   public HostComponentsUpdateListener(AmbariEventPublisher ambariEventPublisher,
                                       STOMPUpdatePublisher STOMPUpdatePublisher) {
     ambariEventPublisher.register(this);
-    STOMPUpdatePublisher.register(this);
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;
   }
 

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/requests/STOMPUpdateListener.java

@@ -41,7 +41,8 @@ public class STOMPUpdateListener {
   public STOMPUpdateListener(Injector injector, Set<STOMPEvent.Type> typesToProcess) {
     STOMPUpdatePublisher STOMPUpdatePublisher =
       injector.getInstance(STOMPUpdatePublisher.class);
-    STOMPUpdatePublisher.register(this);
+    STOMPUpdatePublisher.registerAgent(this);
+    STOMPUpdatePublisher.registerAPI(this);
     this.typesToProcess = typesToProcess == null ? Collections.emptySet() : typesToProcess;
   }
 

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/services/ServiceUpdateListener.java

@@ -57,7 +57,7 @@ public class ServiceUpdateListener {
 
   @Inject
   public ServiceUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher, AmbariEventPublisher ambariEventPublisher) {
-    STOMPUpdatePublisher.register(this);
+    STOMPUpdatePublisher.registerAPI(this);
     ambariEventPublisher.register(this);
 
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;

+ 1 - 1
ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java

@@ -48,7 +48,7 @@ public class UpgradeUpdateListener {
 
   @Inject
   public UpgradeUpdateListener(STOMPUpdatePublisher STOMPUpdatePublisher, AmbariEventPublisher ambariEventPublisher) {
-    STOMPUpdatePublisher.register(this);
+    STOMPUpdatePublisher.registerAPI(this);
 
     this.STOMPUpdatePublisher = STOMPUpdatePublisher;
   }

+ 42 - 10
ambari-server/src/main/java/org/apache/ambari/server/events/publishers/STOMPUpdatePublisher.java

@@ -17,8 +17,11 @@
  */
 package org.apache.ambari.server.events.publishers;
 
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.ambari.server.AmbariRuntimeException;
+import org.apache.ambari.server.events.DefaultMessageEmitter;
 import org.apache.ambari.server.events.HostComponentsUpdateEvent;
 import org.apache.ambari.server.events.RequestUpdateEvent;
 import org.apache.ambari.server.events.STOMPEvent;
@@ -26,13 +29,15 @@ import org.apache.ambari.server.events.ServiceUpdateEvent;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 @Singleton
 public class STOMPUpdatePublisher {
 
-  private final EventBus m_eventBus;
+  private final EventBus agentEventBus;
+  private final EventBus apiEventBus;
 
   @Inject
   private RequestUpdateEventPublisher requestUpdateEventPublisher;
@@ -43,24 +48,51 @@ public class STOMPUpdatePublisher {
   @Inject
   private ServiceUpdateEventPublisher serviceUpdateEventPublisher;
 
-  public STOMPUpdatePublisher() {
-    m_eventBus = new AsyncEventBus("ambari-update-bus",
-      Executors.newSingleThreadExecutor());
+  private final ExecutorService threadPoolExecutorAgent = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("stomp-agent-bus-%d").build());
+  private final ExecutorService threadPoolExecutorAPI = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("stomp-api-bus-%d").build());
+
+  public STOMPUpdatePublisher() throws NoSuchFieldException, IllegalAccessException {
+    agentEventBus = new AsyncEventBus("agent-update-bus",
+        threadPoolExecutorAgent);
+
+    apiEventBus = new AsyncEventBus("api-update-bus",
+        threadPoolExecutorAPI);
   }
 
   public void publish(STOMPEvent event) {
+    if (DefaultMessageEmitter.DEFAULT_AGENT_EVENT_TYPES.contains(event.getType())) {
+      publishAgent(event);
+    } else if (DefaultMessageEmitter.DEFAULT_API_EVENT_TYPES.contains(event.getType())) {
+      publishAPI(event);
+    } else {
+      // TODO need better solution
+      throw new AmbariRuntimeException("Event with type {" + event.getType() + "} can not be published.");
+    }
+  }
+
+  private void publishAPI(STOMPEvent event) {
     if (event.getType().equals(STOMPEvent.Type.REQUEST)) {
-      requestUpdateEventPublisher.publish((RequestUpdateEvent) event, m_eventBus);
+      requestUpdateEventPublisher.publish((RequestUpdateEvent) event, apiEventBus);
     } else if (event.getType().equals(STOMPEvent.Type.HOSTCOMPONENT)) {
-      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, m_eventBus);
+      hostComponentUpdateEventPublisher.publish((HostComponentsUpdateEvent) event, apiEventBus);
     } else if (event.getType().equals(STOMPEvent.Type.SERVICE)) {
-      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, m_eventBus);
+      serviceUpdateEventPublisher.publish((ServiceUpdateEvent) event, apiEventBus);
     } else {
-      m_eventBus.post(event);
+      apiEventBus.post(event);
     }
   }
 
-  public void register(Object object) {
-    m_eventBus.register(object);
+  private void publishAgent(STOMPEvent event) {
+    agentEventBus.post(event);
+  }
+
+  public void registerAgent(Object object) {
+    agentEventBus.register(object);
+  }
+
+  public void registerAPI(Object object) {
+    apiEventBus.register(object);
   }
 }

+ 2 - 1
ambari-server/src/main/java/org/apache/ambari/server/metrics/system/impl/MetricsServiceImpl.java

@@ -116,7 +116,8 @@ public class MetricsServiceImpl implements MetricsService {
         src.init(MetricsConfiguration.getSubsetConfiguration(configuration, "source." + sourceName + "."), sink);
         sources.put(sourceName, src);
         if (src instanceof StompEventsMetricsSource) {
-          STOMPUpdatePublisher.register(src);
+          STOMPUpdatePublisher.registerAPI(src);
+          STOMPUpdatePublisher.registerAgent(src);
         }
         src.start();
       }

+ 10 - 10
ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java

@@ -1280,6 +1280,16 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
       LOG.error("Could not determine stale config", e);
     }
 
+    try {
+      Cluster cluster = clusters.getCluster(clusterName);
+      ServiceComponent serviceComponent = cluster.getService(serviceName).getServiceComponent(serviceComponentName);
+      ServiceComponentHost sch = serviceComponent.getServiceComponentHost(hostName);
+      String refreshConfigsCommand = helper.getRefreshConfigsCommand(cluster,sch);
+      r.setReloadConfig(refreshConfigsCommand != null);
+    } catch (Exception e) {
+      LOG.error("Could not determine reload config flag", e);
+    }
+
     return r;
   }
 
@@ -1307,16 +1317,6 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
       r.setStaleConfig(false);
     }
 
-    try {
-      Cluster cluster = clusters.getCluster(clusterName);
-      ServiceComponent serviceComponent = cluster.getService(serviceName).getServiceComponent(serviceComponentName);
-      ServiceComponentHost sch = serviceComponent.getServiceComponentHost(hostName);
-      String refreshConfigsCommand = helper.getRefreshConfigsCommand(cluster,sch);
-      r.setReloadConfig(refreshConfigsCommand != null);
-    } catch (Exception e) {
-      LOG.error("Could not determine reload config flag", e);
-    }
-
     return r;
   }