소스 검색

AMBARI-5488. Flume: service and component status should be calculated correctly (ncole)

Nate Cole 11 년 전
부모
커밋
5b2e8087b3

+ 44 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java

@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.inject.Inject;
+
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.ClusterNotFoundException;
 import org.apache.ambari.server.DuplicateResourceException;
@@ -104,6 +105,7 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
   static {
     serviceStateMap.put("HDFS", new HDFSServiceState());
     serviceStateMap.put("HBASE", new HBaseServiceState());
+    serviceStateMap.put("FLUME", new FlumeServiceState());
   }
 
   private static final ServiceState DEFAULT_SERVICE_STATE = new DefaultServiceState();
@@ -1130,6 +1132,48 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
     }
   }
 
+  /**
+   * Determines the service status for Flume.  Generically, this means that
+   * the state of Flume is the lowest ordinal state calculated.  For example:
+   * <ul>
+   *   <li>If all handlers are STARTED, service is STARTED.</li>
+   *   <li>If one handler is INSTALLED, the service is INSTALLED.</li>
+   * </ul>
+   */
+  protected static class FlumeServiceState implements ServiceState {
+    @Override
+    public State getState(AmbariManagementController controller,
+        String clusterName, String serviceName) {
+      Clusters       clusters       = controller.getClusters();
+
+      if (clusterName != null && clusterName.length() > 0) {
+        try {
+          Cluster cluster = clusters.getCluster(clusterName);
+          if (cluster != null) {
+
+            ServiceComponentHostRequest request = new ServiceComponentHostRequest(clusterName,
+                serviceName, null, null, null);
+
+            Set<ServiceComponentHostResponse> hostComponentResponses =
+                controller.getHostComponents(Collections.singleton(request));
+            
+            State state = State.UNKNOWN;
+            for (ServiceComponentHostResponse schr : hostComponentResponses) {
+              State schState = getHostComponentState(schr);
+              if (schState.ordinal() < state.ordinal())
+                state = schState;
+            }
+            return state;
+          }
+        } catch (AmbariException e) {
+          LOG.error("Can't determine service state.", e);
+        }
+      }
+       
+      return State.UNKNOWN;
+    }
+  }
+
   /**
    * Determine whether a service state change is valid.
    * Looks at projected state from the current stages associated with the request.

+ 9 - 13
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py

@@ -53,7 +53,6 @@ def flume(action = None):
         content = json.dumps(ambari_meta(agent, flume_agents[agent])),
         mode = 0644)
 
-
   elif action == 'start':
     flume_base = format('env JAVA_HOME={java_home} /usr/bin/flume-ng agent '
       '--name {{0}} '
@@ -97,9 +96,6 @@ def flume(action = None):
     for pid_file in pid_files:
       File(pid_file, action = 'delete')
     
-    pass
-  elif action == 'status':
-    pass
 
 def ambari_meta(agent_name, agent_conf):
   res = {}
@@ -160,7 +156,6 @@ def is_live(pid_file):
 
 def live_status(pid_file):
   import params
-  import traceback
 
   pid_file_part = pid_file.split(os.sep).pop()
 
@@ -185,22 +180,23 @@ def live_status(pid_file):
       res['sinks_count'] = meta['sinks_count']
       res['channels_count'] = meta['channels_count']
   except:
-    traceback.print_exc() 
     pass
 
   return res
   
-
 def flume_status():
   import params
-  
-  procs = []
 
-  pid_files = glob.glob(params.flume_run_dir + os.sep + "*.pid")
+  # these are what Ambari believes should be running
+  meta_files = glob.glob(params.flume_conf_dir + os.sep + "*/ambari-meta.json")
+  pid_files = []
+  for meta_file in meta_files:
+    agent_name = os.path.dirname(meta_file).split(os.sep).pop()
+    pid_files.append(os.path.join(params.flume_run_dir, agent_name + '.pid'))
 
-  if 0 != len(pid_files):
-    for pid_file in pid_files:
-      procs.append(live_status(pid_file))
+  procs = []
+  for pid_file in pid_files:
+    procs.append(live_status(pid_file))
 
   return procs
 

+ 10 - 1
ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume_handler.py

@@ -55,10 +55,19 @@ class FlumeHandler(Script):
 
     env.set_params(params)
 
+    processes = flume_status()
+
     json = {}
-    json['processes'] = flume_status()
+    json['processes'] = processes
 
     self.put_structured_out(json)
 
+    if 0 == len(processes):
+      raise ComponentIsNotRunning()
+    else:
+      for proc in processes:
+        if not proc.has_key('status') or proc['status'] == 'NOT_RUNNING':
+          raise ComponentIsNotRunning()
+
 if __name__ == "__main__":
   FlumeHandler().execute()

+ 58 - 0
ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ServiceResourceProviderTest.java

@@ -1185,7 +1185,65 @@ public class ServiceResourceProviderTest {
     // verify
     verify(managementController, clusters, cluster, ambariMetaInfo, stackId, componentInfo);
   }
+  
+  @Test
+  public void testFlumeServiceState_STARTED() throws Exception {
+    AmbariManagementController managementController = createMock(AmbariManagementController.class);
+    Clusters clusters = createNiceMock(Clusters.class);
+    Cluster cluster = createNiceMock(Cluster.class);
+
+    ServiceComponentHostResponse shr1 = new ServiceComponentHostResponse("C1", "FLUME", "FLUME_HANDLER", "Host100", "STARTED", "", null, null, null);
+    ServiceComponentHostResponse shr2 = new ServiceComponentHostResponse("C1", "FLUME", "FLUME_HANDLER", "Host200", "STARTED", "", null, null, null);
+
+    Set<ServiceComponentHostResponse> responses = new LinkedHashSet<ServiceComponentHostResponse>();
+    responses.add(shr1);
+    responses.add(shr2);
+
+    // set expectations
+    expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+    expect(clusters.getCluster("C1")).andReturn(cluster).anyTimes();
+    expect(managementController.getHostComponents((Set<ServiceComponentHostRequest>) anyObject())).andReturn(responses).anyTimes();
+    
+    replay(managementController, clusters, cluster);
+    
+    ServiceResourceProvider.ServiceState serviceState = new ServiceResourceProvider.FlumeServiceState();
+
+    State state = serviceState.getState(managementController, "C1", "FLUME");
+    
+    Assert.assertEquals(State.STARTED, state);
+    
+    verify(managementController, clusters, cluster);
+  }
+  
+  @Test
+  public void testFlumeServiceState_INSTALLED() throws Exception {
+    AmbariManagementController managementController = createMock(AmbariManagementController.class);
+    Clusters clusters = createNiceMock(Clusters.class);
+    Cluster cluster = createNiceMock(Cluster.class);
+
+    ServiceComponentHostResponse shr1 = new ServiceComponentHostResponse("C1", "FLUME", "FLUME_HANDLER", "Host100", "STARTED", "", null, null, null);
+    ServiceComponentHostResponse shr2 = new ServiceComponentHostResponse("C1", "FLUME", "FLUME_HANDLER", "Host200", "INSTALLED", "", null, null, null);
+
+    Set<ServiceComponentHostResponse> responses = new LinkedHashSet<ServiceComponentHostResponse>();
+    responses.add(shr1);
+    responses.add(shr2);
+
+    // set expectations
+    expect(managementController.getClusters()).andReturn(clusters).anyTimes();
+    expect(clusters.getCluster("C1")).andReturn(cluster).anyTimes();
+    expect(managementController.getHostComponents((Set<ServiceComponentHostRequest>) anyObject())).andReturn(responses).anyTimes();
+    
+    replay(managementController, clusters, cluster);
+    
+    ServiceResourceProvider.ServiceState serviceState = new ServiceResourceProvider.FlumeServiceState();
 
+    State state = serviceState.getState(managementController, "C1", "FLUME");
+    Assert.assertEquals(State.INSTALLED, state);
+    
+    verify(managementController, clusters, cluster);
+  }
+
+  
   public static ServiceResourceProvider getServiceProvider(AmbariManagementController managementController) {
     Resource.Type type = Resource.Type.Service;
     return new ServiceResourceProvider(PropertyHelper.getPropertyIds(type),

+ 17 - 5
ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py

@@ -76,11 +76,17 @@ class TestFlumeHandler(RMFTestCase):
     self.assertNoMoreResources()
 
   @patch("resource_management.libraries.script.Script.put_structured_out")
-  def test_status_default(self, structured_out_mock):
-    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+  @patch("sys.exit")
+  def test_status_default(self, sys_exit_mock, structured_out_mock):
+    
+    try:
+      self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        command = "status",
                        config_file="default.json")
+    except:
+      # expected since ComponentIsNotRunning gets raised
+      pass
     
     # test that the method was called with empty processes
     self.assertTrue(structured_out_mock.called)
@@ -90,15 +96,21 @@ class TestFlumeHandler(RMFTestCase):
 
   @patch("resource_management.libraries.script.Script.put_structured_out")
   @patch("glob.glob")
-  def test_status_with_result(self, glob_mock, structured_out_mock):
-    glob_mock.return_value = ['/var/run/flume/a1.pid']
+  @patch("sys.exit")
+  def test_status_with_result(self, sys_exit_mock, glob_mock, structured_out_mock):
+    glob_mock.return_value = ['/etc/flume/conf/a1/ambari-meta.json']
 
-    self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
+    try:
+      self.executeScript("2.0.6/services/FLUME/package/scripts/flume_handler.py",
                        classname = "FlumeHandler",
                        command = "status",
                        config_file="default.json")
+    except:
+      # expected since ComponentIsNotRunning gets raised
+      pass
     
     self.assertTrue(structured_out_mock.called)
+
     structured_out_mock.assert_called_with({'processes':
       [{'status': 'NOT_RUNNING', 'channels_count': 0, 'sinks_count': 0,
         'name': 'a1', 'sources_count': 0}]})