Selaa lähdekoodia

AMBARI-4110. Avoid duplicating host names in "clusterHostInfo" when preparing tasks.(odiachenko)

Oleksandr Diachenko 11 vuotta sitten
vanhempi
commit
3a84e4f939

+ 2 - 0
ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py

@@ -29,6 +29,7 @@ from PythonExecutor import PythonExecutor
 from AmbariConfig import AmbariConfig
 import hostname
 from LiveStatus import LiveStatus
+import manifestGenerator
 
 
 logger = logging.getLogger()
@@ -211,6 +212,7 @@ class CustomServiceOrchestrator():
       file_path = os.path.join(self.tmp_dir, "status_command.json")
     else:
       task_id = command['taskId']
+      command['clusterHostInfo'] = manifestGenerator.decompressClusterHostInfo(command['clusterHostInfo'])
       file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
     # Json may contain passwords, that's why we need proper permissions
     if os.path.isfile(file_path):

+ 96 - 2
ambari-agent/src/main/python/ambari_agent/manifestGenerator.py

@@ -25,7 +25,10 @@ from datetime import datetime
 import pprint
 import AmbariConfig
 import hostname
+from ambari_agent import AgentException
 
+HOSTS_LIST_KEY = "all_hosts"
+PING_PORTS_KEY = "all_ping_ports"
 
 logger = logging.getLogger()
 
@@ -37,6 +40,97 @@ non_global_configuration_types = ["hdfs-site", "core-site",
                              "webhcat-site", "hdfs-exclude-file", "hue-site",
                              "yarn-site"]
 
+# Converts from 1-3,5,6-8 to [1,2,3,5,6,7,8] 
+def convertRangeToList(list):
+  
+  resultList = []
+
+  for i in list:
+      
+    ranges = i.split(',')
+    
+    for r in ranges:
+      rangeBounds = r.split('-')
+      if len(rangeBounds) == 2:
+        
+        if not rangeBounds[0] or not rangeBounds[1]:
+          raise AgentException.AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
+            
+        
+        resultList.extend(range(int(rangeBounds[0]), int(rangeBounds[1]) + 1))
+      elif len(rangeBounds) == 1:
+        resultList.append((int(rangeBounds[0])))
+      else:
+        raise AgentException.AgentException("Broken data in given range, expected - ""m-n"" or ""m"", got : " + str(r))
+    
+  return resultList
+
+#Converts from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
+def convertMappedRangeToList(list):
+    
+  resultDict = {}
+  
+  for i in list:
+    valueToRanges = i.split(":")
+    if len(valueToRanges) <> 2:
+      raise AgentException.AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(i))
+    value = valueToRanges[0]
+    rangesToken = valueToRanges[1]
+    
+    for r in rangesToken.split(','):
+        
+      rangeIndexes = r.split('-')
+    
+      if len(rangeIndexes) == 2:
+          
+        if not rangeIndexes[0] or not rangeIndexes[1]:
+          raise AgentException.AgentException("Broken data in given value to range, expected format - ""value:m-n"", got - " + str(r))
+
+        start = int(rangeIndexes[0])
+        end = int(rangeIndexes[1])
+        
+        for k in range(start, end + 1):
+          resultDict[k] = int(value)
+        
+        
+      elif len(rangeIndexes) == 1:
+        index = int(rangeIndexes[0])
+        
+        resultDict[index] = int(value)
+       
+
+  resultList = dict(sorted(resultDict.items())).values()
+      
+  return resultList
+
+def decompressClusterHostInfo(clusterHostInfo):
+  info = clusterHostInfo.copy()
+  #Pop info not related to host roles  
+  hostsList = info.pop(HOSTS_LIST_KEY)
+  pingPorts = info.pop(PING_PORTS_KEY)
+
+  decompressedMap = {}
+
+  for k,v in info.items():
+    # Convert from 1-3,5,6-8 to [1,2,3,5,6,7,8] 
+    indexes = convertRangeToList(v)
+    # Convert from [1,2,3,5,6,7,8] to [host1,host2,host3...]
+    decompressedMap[k] = [hostsList[i] for i in indexes]
+  
+  #Convert from ['1:0-2,4', '42:3,5-7'] to [1,1,1,42,1,42,42,42]
+  pingPorts = convertMappedRangeToList(pingPorts)
+  
+  #Convert all elements to str
+  pingPorts = map(str, pingPorts)
+
+  #Add ping ports to result
+  decompressedMap[PING_PORTS_KEY] = pingPorts
+  #Add hosts list to result
+  decompressedMap[HOSTS_LIST_KEY] = hostsList
+  
+  return decompressedMap
+
+
 #read static imports from file and write them to manifest
 def writeImports(outputFile, modulesdir, importsList):
   logger.info("Modules dir is " + modulesdir)
@@ -55,7 +149,7 @@ def generateManifest(parsedJson, fileName, modulesdir, ambariconfig):
   clusterHostInfo = {} 
   if 'clusterHostInfo' in parsedJson:
     if parsedJson['clusterHostInfo']:
-      clusterHostInfo = parsedJson['clusterHostInfo']
+      clusterHostInfo = decompressClusterHostInfo(parsedJson['clusterHostInfo'])
   params = {}
   if 'hostLevelParams' in parsedJson: 
     if parsedJson['hostLevelParams']:
@@ -120,7 +214,6 @@ def generateManifest(parsedJson, fileName, modulesdir, ambariconfig):
   except TypeError:
     errMsg = 'Manifest can\'t be generated from the JSON \n' + \
                     json.dumps(parsedJson, sort_keys=True, indent=4)
-
     logger.error(errMsg)
   finally:
     manifest.close()
@@ -292,6 +385,7 @@ def main():
   inputJsonStr = jsonStr
   parsedJson = json.loads(inputJsonStr)
   generateManifest(parsedJson, 'site.pp', modulesdir)
+  
 
 if __name__ == '__main__':
   main()

+ 4 - 2
ambari-agent/src/main/python/ambari_agent/test.json

@@ -4,8 +4,10 @@
 
 "clusterHostInfo" : 
 {
-"NAMENODE": ["h2.hortonworks.com"],
-"DATANODE": ["h1.hortonworks.com", "h2.hortonworks.com"]
+"namenode_host" : ["1"],
+"slave_hosts"   : ["0", "1"],
+"all_hosts"     : ["h1.hortonworks.com", "h2.hortonworks.com"],
+"all_ping_ports"    : ["8670:0,1"]
 },
 "hostLevelParams": 
 {

+ 17 - 2
ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py

@@ -37,6 +37,7 @@ import sys
 from AgentException import AgentException
 from FileCache import FileCache
 from LiveStatus import LiveStatus
+import manifestGenerator
 
 
 class TestCustomServiceOrchestrator(TestCase):
@@ -55,10 +56,11 @@ class TestCustomServiceOrchestrator(TestCase):
     self.config.set('python', 'custom_actions_dir', tmpdir)
 
 
+  @patch.object(manifestGenerator, 'decompressClusterHostInfo')
   @patch("hostname.public_hostname")
   @patch("os.path.isfile")
   @patch("os.unlink")
-  def test_dump_command_to_json(self, unlink_mock, isfile_mock, hostname_mock):
+  def test_dump_command_to_json(self, unlink_mock, isfile_mock, hostname_mock, decompress_cluster_host_info_mock):
     hostname_mock.return_value = "test.hst"
     command = {
       'commandType': 'EXECUTION_COMMAND',
@@ -69,8 +71,18 @@ class TestCustomServiceOrchestrator(TestCase):
       'clusterName': u'cc',
       'serviceName': u'HDFS',
       'configurations':{'global' : {}},
-      'configurationTags':{'global' : { 'tag': 'v1' }}
+      'configurationTags':{'global' : { 'tag': 'v1' }},
+      'clusterHostInfo':{'namenode_host' : ['1'],
+                         'slave_hosts'   : ['0', '1'],
+                         'all_hosts'     : ['h1.hortonworks.com', 'h2.hortonworks.com'],
+                         'all_ping_ports': ['8670:0,1']}
     }
+    
+    decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'],
+                         'slave_hosts'   : ['h1.hortonworks.com', 'h2.hortonworks.com'],
+                         'all_hosts'     : ['h1.hortonworks.com', 'h2.hortonworks.com'],
+                         'all_ping_ports': ['8670', '8670']}
+    
     config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
@@ -82,14 +94,17 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertTrue(os.path.getsize(json_file) > 0)
     self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
     self.assertTrue(json_file.endswith("command-3.json"))
+    self.assertTrue(decompress_cluster_host_info_mock.called)
     os.unlink(json_file)
     # Test dumping STATUS_COMMAND
     command['commandType']='STATUS_COMMAND'
+    decompress_cluster_host_info_mock.reset_mock()
     json_file = orchestrator.dump_command_to_json(command)
     self.assertTrue(os.path.exists(json_file))
     self.assertTrue(os.path.getsize(json_file) > 0)
     self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600')
     self.assertTrue(json_file.endswith("status_command.json"))
+    self.assertFalse(decompress_cluster_host_info_mock.called)
     os.unlink(json_file)
     # Testing side effect of dump_command_to_json
     self.assertEquals(command['public_hostname'], "test.hst")

+ 108 - 1
ambari-agent/src/test/python/ambari_agent/TestManifestGenerator.py

@@ -18,6 +18,7 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 import os, sys, StringIO
+from ambari_agent.AgentException import AgentException
 
 from unittest import TestCase
 from ambari_agent import manifestGenerator
@@ -66,7 +67,8 @@ class TestManifestGenerator(TestCase):
   @patch.object(manifestGenerator, 'writeNodes')
   @patch.object(manifestGenerator, 'writeParams')
   @patch.object(manifestGenerator, 'writeTasks')
-  def testGenerateManifest(self, writeTasksMock, writeParamsMock, writeNodesMock, writeImportsMock):
+  @patch.object(manifestGenerator, 'decompressClusterHostInfo')
+  def testGenerateManifest(self, decompressClusterHostInfoMock, writeTasksMock, writeParamsMock, writeNodesMock, writeImportsMock):
     tmpFileName = tempfile.mkstemp(dir=self.dir, text=True)[1]
     self.parsedJson['roleParams'] = 'role param'
     manifestGenerator.generateManifest(self.parsedJson, tmpFileName, '../../main/puppet/modules', self.config.getConfig())
@@ -75,6 +77,7 @@ class TestManifestGenerator(TestCase):
     self.assertTrue(writeNodesMock.called)
     self.assertTrue(writeImportsMock.called)
     self.assertTrue(writeTasksMock.called)
+    self.assertTrue(decompressClusterHostInfoMock.called)
 
     print file(tmpFileName).read()
 
@@ -140,3 +143,107 @@ class TestManifestGenerator(TestCase):
     print tmpFile.read()
     tmpFile.close()
     os.remove(tmpFileName)
+    
+  def testConvertRangeToList(self):
+    
+    rangesList = ["1-3", "4", "6", "7-9"]
+    list = manifestGenerator.convertRangeToList(rangesList)
+    self.assertEqual(sorted(list), sorted([1,2,3,4,6,7,8,9]))
+    
+    rangesList = ["5", "4"]
+    list = manifestGenerator.convertRangeToList(rangesList)
+    self.assertEqual(list, [5,4])
+
+    exceptionWasTrown = False
+    try:
+      rangesList = ["0", "-2"]
+      list = manifestGenerator.convertRangeToList(rangesList)
+    except AgentException, err:
+      #Expected
+      exceptionWasTrown = True
+      
+    self.assertTrue(exceptionWasTrown)
+    
+    exceptionWasTrown = False
+    try:
+      rangesList = ["0", "-"]
+      list = manifestGenerator.convertRangeToList(rangesList)
+    except AgentException, err:
+      #Expected
+      exceptionWasTrown = True
+    self.assertTrue(exceptionWasTrown)
+    
+    exceptionWasTrown = False
+    try:
+      rangesList = ["0", "2-"]
+      list = manifestGenerator.convertRangeToList(rangesList)
+    except AgentException, err:
+      #Expected
+      exceptionWasTrown = True
+    self.assertTrue(exceptionWasTrown)
+    
+  def testConvertMappedRangeToList(self):
+    mappedRangedList = ["1:0-2,5", "2:3,4"]
+    list = manifestGenerator.convertMappedRangeToList(mappedRangedList)
+    self.assertEqual(list, [1,1,1,2,2,1])
+    
+    mappedRangedList = ["7:0"]
+    list = manifestGenerator.convertMappedRangeToList(mappedRangedList)
+    self.assertEqual(list, [7])
+    
+    exceptionWasTrown = False
+    mappedRangedList = ["7:0-"]
+    try:
+      list = manifestGenerator.convertMappedRangeToList(mappedRangedList)
+    except AgentException, err:
+      #Expected
+      exceptionWasTrown = True
+    self.assertTrue(exceptionWasTrown)
+    
+    
+    exceptionWasTrown = False
+    mappedRangedList = ["7:-"]
+    try:
+      list = manifestGenerator.convertMappedRangeToList(mappedRangedList)
+    except AgentException, err:
+      #Expected
+      exceptionWasTrown = True
+    self.assertTrue(exceptionWasTrown)
+    
+    exceptionWasTrown = False
+    mappedRangedList = ["7:-1"]
+    try:
+      list = manifestGenerator.convertMappedRangeToList(mappedRangedList)
+    except AgentException, err:
+      #Expected
+      exceptionWasTrown = True
+    self.assertTrue(exceptionWasTrown)
+    
+    def testDecompressClusterHostInfo(self):
+        
+      info = { "jtnode_host"        : ["5"],
+               "hbase_master_hosts" : ["5"],
+               "all_hosts"          : ["h8", "h9", "h5", "h4", "h7", "h6", "h1", "h3", "h2", "h10"],
+               "namenode_host"      : ["6"],
+               "mapred_tt_hosts"    : ["0", "7-9", "2","3", "5"],
+               "slave_hosts"        : ["3", "0", "1", "5-9"],
+               "snamenode_host"     : ["8"],
+               "ping_ports"         : ["8670:1,5-8", "8673:9", "8672:0,4", "8671:2,3"],
+               "hbase_rs_hosts"     : ["3", "1", "5", "8", "9"]
+      }
+        
+      decompressedInfo = manifestGenerator.decompressClusterHostInfo(clusterHostInfo)
+      
+      self.assertTrue(decompressedInfo.has_key("all_hosts"))
+      
+      allHosts = decompressedInfo.pop("all_hosts")
+      
+      self.assertEquals(info.get("all_hosts"), decompressedInfo.get("all_hosts"))
+      
+      pingPorts = decompressedInfo.pop("all_ping_ports")
+      
+      self.assertEquals(pingPorts, manifestGenerator.convertMappedRangeToList(info.get("all_ping_ports")))
+      
+      for k,v in decompressedInfo.items():
+        self.assertEquals(v, manifestGenerator.convertRangeToList(info.get(k)))
+

+ 4 - 3
ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java

@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -82,7 +83,7 @@ class ActionScheduler implements Runnable {
    */
   private boolean activeAwakeRequest = false;
   //Cache for clusterHostinfo, key - stageId-requestId
-  private Cache<String, Map<String, List<String>>> clusterHostInfoCache;
+  private Cache<String, Map<String, Set<String>>> clusterHostInfoCache;
 
   public ActionScheduler(long sleepTimeMilliSec, long actionTimeoutMilliSec,
       ActionDBAccessor db, ActionQueue actionQueue, Clusters fsmObject,
@@ -454,10 +455,10 @@ class ActionScheduler implements Runnable {
 
     //Try to get clusterHostInfo from cache
     String stagePk = s.getStageId() + "-" + s.getRequestId();
-    Map<String, List<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(stagePk);
+    Map<String, Set<String>> clusterHostInfo = clusterHostInfoCache.getIfPresent(stagePk);
     
     if (clusterHostInfo == null) {
-      Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
+      Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
       clusterHostInfo = StageUtils.getGson().fromJson(s.getClusterHostInfo(), type);
       clusterHostInfoCache.put(stagePk, clusterHostInfo);
     }

+ 9 - 6
ambari-server/src/main/java/org/apache/ambari/server/agent/ExecutionCommand.java

@@ -18,10 +18,9 @@
 package org.apache.ambari.server.agent;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.commons.logging.Log;
@@ -50,8 +49,8 @@ public class ExecutionCommand extends AgentCommand {
   private Map<String, String> hostLevelParams = new HashMap<String, String>();
   private Map<String, String> roleParams = null;
   private RoleCommand roleCommand;
-  private Map<String, List<String>> clusterHostInfo = 
-      new HashMap<String, List<String>>();
+  private Map<String, Set<String>> clusterHostInfo = 
+      new HashMap<String, Set<String>>();
   private Map<String, Map<String, String>> configurations;
   private Map<String, Map<String, String>> configurationTags;
   private Map<String, String> commandParams;
@@ -166,12 +165,12 @@ public class ExecutionCommand extends AgentCommand {
   }
 
   @JsonProperty("clusterHostInfo")
-  public Map<String, List<String>> getClusterHostInfo() {
+  public Map<String, Set<String>> getClusterHostInfo() {
     return clusterHostInfo;
   }
 
   @JsonProperty("clusterHostInfo")
-  public void setClusterHostInfo(Map<String, List<String>> clusterHostInfo) {
+  public void setClusterHostInfo(Map<String, Set<String>> clusterHostInfo) {
     this.clusterHostInfo = clusterHostInfo;
   }
   
@@ -252,6 +251,10 @@ public class ExecutionCommand extends AgentCommand {
     String REPO_INFO = "repo_info";
     String DB_NAME = "db_name";
     String GLOBAL = "global";
+    String AMBARI_DB_RCA_URL = "ambari_db_rca_url";
+    String AMBARI_DB_RCA_DRIVER = "ambari_db_rca_driver";
+    String AMBARI_DB_RCA_USERNAME = "ambari_db_rca_username";
+    String AMBARI_DB_RCA_PASSWORD = "ambari_db_rca_password";
     String SERVICE_CHECK = "SERVICE_CHECK"; // TODO: is it standart command? maybe add it to RoleCommand enum?
 
     String COMMAND_TIMEOUT_DEFAULT = "600"; // TODO: Will be replaced by proper initialization in another jira

+ 2 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariCustomCommandExecutionHelperImpl.java

@@ -221,6 +221,7 @@ public class AmbariCustomCommandExecutionHelperImpl implements AmbariCustomComma
     hostLevelParams.put(JDK_LOCATION, amc.getJdkResourceUrl());
     hostLevelParams.put(STACK_NAME, stackId.getStackName());
     hostLevelParams.put(STACK_VERSION,stackId.getStackVersion());
+    hostLevelParams.putAll(amc.getRcaParameters());
     execCmd.setHostLevelParams(hostLevelParams);
 
     Map<String,String> commandParams = new TreeMap<String, String>();
@@ -427,6 +428,7 @@ public class AmbariCustomCommandExecutionHelperImpl implements AmbariCustomComma
     hostParams.put(DB_NAME, amc.getServerDB());
     hostParams.put(MYSQL_JDBC_URL, amc.getMysqljdbcUrl());
     hostParams.put(ORACLE_JDBC_URL, amc.getOjdbcUrl());
+    hostParams.putAll(amc.getRcaParameters());
 
     // Write down os specific info for the service
     ServiceOsSpecific anyOs = null;

+ 8 - 0
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementController.java

@@ -482,5 +482,13 @@ public interface AmbariManagementController {
    */
   public Map<String, Map<String,String>> findConfigurationTagsWithOverrides(
           Cluster cluster, String hostName) throws AmbariException;
+  
+  /**
+   * Returns parameters for RCA database
+   *
+   * @return the map with parameters for RCA db
+   *
+   */
+  public Map<String, String> getRcaParameters();
 }
   

+ 24 - 2
ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java

@@ -1047,7 +1047,7 @@ public class AmbariManagementControllerImpl implements
       // FIXME cannot work with a single stage
       // multiple stages may be needed for reconfigure
       long stageId = 0;
-      Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+      Map<String, Set<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
           clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap, injector.getInstance(Configuration.class));
       
       
@@ -1978,7 +1978,7 @@ public class AmbariManagementControllerImpl implements
       actionExecContext = actionExecutionHelper.validateCustomAction(actionRequest, cluster);
     }
 
-    Map<String, List<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
+    Map<String, Set<String>> clusterHostInfo = StageUtils.getClusterHostInfo(
         clusters.getHostsForCluster(cluster.getClusterName()), cluster, hostsMap,
         configs);
 
@@ -1991,6 +1991,7 @@ public class AmbariManagementControllerImpl implements
     // TODO : Update parameter population to be done only here
     params.put(JDK_LOCATION, this.jdkResourceUrl);
     params.put(STACK_VERSION, cluster.getDesiredStackVersion().getStackVersion());
+    params.putAll(getRcaParameters());
 
     if (actionRequest.isCommand()) {
       customCommandExecutionHelper.addAction(actionRequest, stage, hostsMap, params);
@@ -2537,4 +2538,25 @@ public class AmbariManagementControllerImpl implements
   public String getMysqljdbcUrl() {
     return mysqljdbcUrl;
   }
+  
+  public Map<String, String> getRcaParameters() {
+
+    String hostName = StageUtils.getHostName();
+
+    String url = configs.getRcaDatabaseUrl();
+    if (url.contains(Configuration.HOSTNAME_MACRO))
+      url =
+          url.replace(Configuration.HOSTNAME_MACRO,
+              hostsMap.getHostMap(hostName));
+
+    Map<String, String> rcaParameters = new HashMap<String, String>();
+
+    rcaParameters.put(AMBARI_DB_RCA_URL, url);
+    rcaParameters.put(AMBARI_DB_RCA_DRIVER, configs.getRcaDatabaseDriver());
+    rcaParameters.put(AMBARI_DB_RCA_USERNAME, configs.getRcaDatabaseUser());
+    rcaParameters.put(AMBARI_DB_RCA_PASSWORD, configs.getRcaDatabasePassword());
+
+    return rcaParameters;
+  }
+  
 }

+ 161 - 49
ambari-server/src/main/java/org/apache/ambari/server/utils/StageUtils.java

@@ -24,26 +24,31 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 import javax.xml.bind.JAXBException;
 
+import com.google.common.base.Joiner;
 import com.google.gson.Gson;
-import com.google.inject.Injector;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.Role;
 import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.Stage;
 import org.apache.ambari.server.agent.ExecutionCommand;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.HostsMap;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.Service;
 import org.apache.ambari.server.state.ServiceComponent;
 import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
 import org.apache.commons.logging.Log;
@@ -55,13 +60,19 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
 
 public class StageUtils {
+  
+
   private static final Log LOG = LogFactory.getLog(StageUtils.class);
 
   private static Map<String, String> componentToClusterInfoKeyMap =
       new HashMap<String, String>();
 
   private volatile static Gson gson;
-  private static final String DEFAULT_PING_PORT = "8670";
+  public static final Integer DEFAULT_PING_PORT = 8670;
+
+  private static final String HOSTS_LIST = "all_hosts";
+
+  private static final String PORTS = "all_ping_ports";
 
   public static void setGson(Gson gson) {
     if (gson==null) {
@@ -86,7 +97,7 @@ public class StageUtils {
   static {
     componentToClusterInfoKeyMap.put("NAMENODE", "namenode_host");
     componentToClusterInfoKeyMap.put("JOBTRACKER", "jtnode_host");
-    componentToClusterInfoKeyMap.put("SNAMENODE", "snamenode_host");
+    componentToClusterInfoKeyMap.put("SECONDARY_NAMENODE", "snamenode_host");
     componentToClusterInfoKeyMap.put("RESOURCEMANAGER", "rm_host");
     componentToClusterInfoKeyMap.put("NODEMANAGER", "nm_hosts");
     componentToClusterInfoKeyMap.put("HISTORYSERVER", "hs_host");
@@ -190,59 +201,161 @@ public class StageUtils {
   }
 
 
-  public static Map<String, List<String>> getClusterHostInfo(
+  public static Map<String, Set<String>> getClusterHostInfo(
       Map<String, Host> allHosts, Cluster cluster, HostsMap hostsMap,
       Configuration configuration) throws AmbariException {
-    Map<String, List<String>> info = new HashMap<String, List<String>>();
-    if (cluster.getServices() != null) {
-      String hostName = getHostName();
-      for (String serviceName : cluster.getServices().keySet()) {
-        if (cluster.getServices().get(serviceName) != null) {
-          for (String componentName : cluster.getServices().get(serviceName)
-              .getServiceComponents().keySet()) {
-            String clusterInfoKey = componentToClusterInfoKeyMap
-                .get(componentName);
-            if (clusterInfoKey == null) {
-              continue;
-            }
-            ServiceComponent scomp = cluster.getServices().get(serviceName)
-                .getServiceComponents().get(componentName);
-            if (scomp.getServiceComponentHosts() != null
-                && !scomp.getServiceComponentHosts().isEmpty()) {
-              List<String> hostList = new ArrayList<String>();
-              for (String host: scomp.getServiceComponentHosts().keySet()) {
-                String mappedHost = hostsMap.getHostMap(host);
-                hostList.add(mappedHost);
-              }
-              info.put(clusterInfoKey, hostList);
-            }
-            //Set up ambari-rca connection properties, is this a hack?
-            //info.put("ambari_db_server_host", Arrays.asList(hostsMap.getHostMap(getHostName())));
-            String url = configuration.getRcaDatabaseUrl();
-            if (url.contains(Configuration.HOSTNAME_MACRO)) {
-              url = url.replace(Configuration.HOSTNAME_MACRO, hostsMap.getHostMap(hostName));
+
+    Map<String, SortedSet<Integer>> hostRolesInfo = new HashMap<String, SortedSet<Integer>>();
+    
+    Map<String, Set<String>> clusterHostInfo = new HashMap<String, Set<String>>();
+
+    //Fill hosts and ports lists
+    Set<String> hostsSet = new LinkedHashSet<String>();
+    List<Integer> portsList = new ArrayList<Integer>();
+    
+    for (Host host : allHosts.values()) {
+      
+      Integer currentPingPort = host.getCurrentPingPort() == null ?
+          DEFAULT_PING_PORT : host.getCurrentPingPort();
+      
+      hostsSet.add(host.getHostName());
+      portsList.add(currentPingPort);
+    }
+    
+    List<String> hostsList = new ArrayList<String>(hostsSet);
+    
+    //Fill host roles
+    for (Entry<String, Service> serviceEntry : cluster.getServices().entrySet()) { 
+      
+      Service service = serviceEntry.getValue();
+      
+      for (Entry<String, ServiceComponent> serviceComponentEntry : service.getServiceComponents().entrySet()) {
+        
+        ServiceComponent serviceComponent = serviceComponentEntry.getValue();
+        String componentName = serviceComponent.getName();
+
+        for (final String hostName:serviceComponent.getServiceComponentHosts().keySet()) {
+
+          if (componentToClusterInfoKeyMap.containsKey(componentName)) {
+            
+            String roleName = componentToClusterInfoKeyMap.get(componentName);
+            SortedSet<Integer> hostsForComponentsHost = hostRolesInfo.get(roleName);
+            
+            if (hostsForComponentsHost == null) {
+              hostsForComponentsHost = new TreeSet<Integer>();
+              hostRolesInfo.put(roleName, hostsForComponentsHost);
             }
-            info.put("ambari_db_rca_url", Arrays.asList(url));
-            info.put("ambari_db_rca_driver", Arrays.asList(configuration.getRcaDatabaseDriver()));
-            info.put("ambari_db_rca_username", Arrays.asList(configuration.getRcaDatabaseUser()));
-            info.put("ambari_db_rca_password", Arrays.asList(configuration.getRcaDatabasePassword()));
 
+            int hostIndex = hostsList.indexOf(hostName);
+            //Add index of host to current host role
+            hostsForComponentsHost.add(hostIndex);
           }
+          else
+            LOG.warn("Component " + componentName + " doesn't have mapped role name for cluster host info");
         }
       }
     }
+    
+    for (String roleName : componentToClusterInfoKeyMap.values()) {
+      if (hostRolesInfo.containsKey(roleName)) {
 
-    // Add a lists of all hosts and all ping ports for agents and hosts monitoring
-    List<String> allHostNames = new ArrayList<String>();
-    List<String> allHostPingPorts = new ArrayList<String>();
-    for (Host host : allHosts.values()) {
-      allHostNames.add(host.getHostName());
-      allHostPingPorts.add(host.getCurrentPingPort() == null ?
-        DEFAULT_PING_PORT : host.getCurrentPingPort().toString());
+        TreeSet<Integer> sortedSet =
+            new TreeSet<Integer>(hostRolesInfo.get(roleName));
+
+        Set<String> replacedRangesSet = replaceRanges(sortedSet);
+
+        clusterHostInfo.put(roleName, replacedRangesSet);
+
+      }
     }
-    info.put("all_hosts", allHostNames);
-    info.put("all_ping_ports", allHostPingPorts);
-    return info;
+
+    clusterHostInfo.put(HOSTS_LIST, hostsSet);
+    clusterHostInfo.put(PORTS, replaceMappedRanges(portsList));
+    
+    return clusterHostInfo;
+  }
+  
+  
+  /**
+   * Finds ranges in sorted set and replaces ranges by compact notation
+   * 
+   * <p>For example, suppose <tt>set</tt> comprises<tt> [1, 2, 3, 4, 7]</tt>.
+   * After invoking <tt>rangedSet = StageUtils.replaceRanges(set)</tt> 
+   * <tt>rangedSet</tt> will comprise
+   * <tt>["1-4", "7"]</tt>..
+   *
+   * @param  set  the source set to be ranged
+   */
+  public static Set<String> replaceRanges(SortedSet<Integer> set) {
+    
+    if (set == null)
+      return null;
+    
+    Set<String> rangedSet = new HashSet<String>();
+    
+    Integer prevElement = null;
+    Integer startOfRange = set.first();
+    
+    for (Integer i : set) {
+      if (prevElement != null && (i - prevElement) > 1 ) {
+        String rangeItem = getRangedItem(startOfRange, prevElement);
+        rangedSet.add(rangeItem);
+        startOfRange = i;
+      }
+      prevElement = i;
+    }
+    
+    rangedSet.add(getRangedItem(startOfRange, prevElement));
+    
+    return rangedSet;
+  }
+  
+  /**
+   * Finds ranges in list and replaces ranges by compact notation
+   * 
+   * <p>For example, suppose <tt>list</tt> comprises<tt> [1, 1, 2, 2, 1, 3]</tt>.
+   * After invoking <tt>rangedMappedSet = StageUtils.replaceMappedRanges(list)</tt> 
+   * <tt>rangedMappedSet</tt> will comprise
+   * <tt>["1:0-1,4", "2:2-3", "3:5"]</tt>..
+   *
+   * @param  list  the source list to be ranged
+   */
+  public static Set<String> replaceMappedRanges(List<Integer> values) {
+
+    Map<Integer, SortedSet<Integer>> convolutedValues = new HashMap<Integer, SortedSet<Integer>>();
+
+    int valueIndex = 0;
+
+    for (Integer value : values) {
+
+      SortedSet<Integer> correspValues = convolutedValues.get(value);
+
+      if (correspValues == null) {
+        correspValues = new TreeSet<Integer>();
+        convolutedValues.put(value, correspValues);
+      }
+      correspValues.add(valueIndex);
+      valueIndex++;
+    }
+
+    Set<String> result = new HashSet<String>();
+
+    for (Entry<Integer, SortedSet<Integer>> entry : convolutedValues.entrySet()) {
+      Set<String> replacedRanges = replaceRanges(entry.getValue());
+      result.add(entry.getKey() + ":" + Joiner.on(",").join(replacedRanges));
+    }
+
+    return result;
+  }
+
+  private static String getRangedItem(Integer startOfRange, Integer endOfRange) {
+    
+    String separator = (endOfRange - startOfRange) > 1 ? "-" : ",";
+    
+    String rangeItem = endOfRange.equals(startOfRange) ? 
+        endOfRange.toString() :
+          startOfRange + separator + endOfRange;
+    return rangeItem;
   }
 
   public static String getHostName() {
@@ -253,5 +366,4 @@ public class StageUtils {
       return "localhost";
     }
   }
-
 }

+ 264 - 0
ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Oracle-UPGRADE.sql

@@ -74,6 +74,270 @@ SET
   
 ALTER TABLE stage MODIFY (cluster_host_info NOT NULL);
 
+
+
+
+CREATE OR REPLACE PACKAGE compress_cluster_host_info_pkg AS
+  TYPE nested_vachar2_tbl_type IS TABLE OF VARCHAR2(4000);
+  FUNCTION compress_cluster_host_info(p_cluster_host_info BLOB) RETURN BLOB;
+  FUNCTION get_keys(p_cluster_host_info VARCHAR2) RETURN VARCHAR2TBL PIPELINED;
+  FUNCTION get_value(p_cluster_host_info BLOB, p_param_key VARCHAR2) RETURN VARCHAR2;
+  FUNCTION split_to_array(p_string VARCHAR2,p_sep VARCHAR2) RETURN nested_vachar2_tbl_type;
+  FUNCTION index_of(p_arr  nested_vachar2_tbl_type, p_item VARCHAR2) RETURN INTEGER;
+  FUNCTION to_indexed_array(p_arr nested_vachar2_tbl_type, p_dict_arr nested_vachar2_tbl_type) RETURN nested_vachar2_tbl_type;
+  FUNCTION To_mapped_indexed_array(p_arr nested_vachar2_tbl_type) RETURN nested_vachar2_tbl_type;
+  FUNCTION Nested_table_to_string(p_arr nested_vachar2_tbl_type, p_sep VARCHAR2) RETURN VARCHAR2;
+END compress_cluster_host_info_pkg;
+
+/
+
+CREATE OR REPLACE PACKAGE BODY compress_cluster_host_info_pkg AS
+  c_regex_pattern CONSTANT VARCHAR2(29) := '":\[[a-zA-Z0-9@:/":.,-]{1,}\]';
+  
+  PROCEDURE print_nested_table(p_arr NESTED_VACHAR2_TBL_TYPE)
+  AS
+  BEGIN
+      FOR i IN p_arr.FIRST..p_arr.LAST LOOP
+          dbms_output.put_line(i
+                               || ': '
+                               || P_arr(i));
+      END LOOP;
+  END print_nested_table;
+  FUNCTION Get_keys(p_cluster_host_info VARCHAR2)
+  RETURN VARCHAR2TBL PIPELINED
+  AS
+  BEGIN
+      FOR r IN (SELECT param_key
+                FROM   (SELECT substr(regexp_substr(regexp_replace(
+                                                    p_cluster_host_info
+                                                    , c_regex_pattern
+                                                            , ' '),
+                                                      '[^ ]+'
+                                      , 1, LEVEL), 3) AS param_key
+                        FROM dual
+                        CONNECT BY LEVEL <= regexp_count(regexp_replace(
+                                                         p_cluster_host_info,
+                                                         c_regex_pattern, ' '),
+                                            '[^ ]+'))
+                WHERE  param_key IS NOT NULL
+                       AND NOT param_key LIKE '%ambari_db_rca%') LOOP
+          PIPE ROW(r.param_key);
+      END LOOP;
+  END get_keys;
+  FUNCTION get_value(p_cluster_host_info BLOB,
+                     p_param_key         VARCHAR2)
+  RETURN VARCHAR2
+  AS
+    v_param_value VARCHAR2(32767);
+  BEGIN
+      SELECT regexp_substr(utl_raw.Cast_to_varchar2(p_cluster_host_info), '"'
+                                                                          ||
+                   p_param_key
+                                                                          ||
+             '":\[["a-z0-9., ]{1,}]')
+      INTO   v_param_value
+      FROM   dual;
+
+      SELECT substr(v_param_value, length(p_param_key) + 5,
+                    dbms_lob.Getlength(v_param_value) - length(p_param_key) - 5)
+      INTO   v_param_value
+      FROM   dual;
+
+      RETURN v_param_value;
+  END get_value;
+  
+  FUNCTION compress_cluster_host_info (p_cluster_host_info BLOB)
+  RETURN BLOB
+  AS
+    CURSOR cur1(
+      p_param_name VARCHAR2) IS
+      SELECT *
+      FROM   (SELECT column_value
+                            AS
+                            param_name,
+  compress_cluster_host_info_pkg.get_value(p_cluster_host_info, column_value) AS
+  param_value
+  FROM   TABLE(compress_cluster_host_info_pkg.get_keys((
+                      utl_raw.cast_to_varchar2(p_cluster_host_info) )))) a
+  WHERE  ( a.param_name = p_param_name
+          OR ( p_param_name IS NULL
+               AND a.param_name NOT IN ( 'all_hosts', 'all_ping_ports' ) ) );
+  l_result                BLOB;
+  l_raw                   RAW(32767);
+  l_all_hosts             NESTED_VACHAR2_TBL_TYPE;
+  l_all_ping_ports        NESTED_VACHAR2_TBL_TYPE;
+  l_compressed_ping_ports NESTED_VACHAR2_TBL_TYPE;
+  l_indexed               NESTED_VACHAR2_TBL_TYPE;
+  BEGIN
+    dbms_lob.createtemporary(l_result, FALSE);
+
+    dbms_lob.OPEN(l_result, dbms_lob.lob_readwrite);
+
+    FOR r IN cur1('all_hosts') LOOP
+      l_all_hosts := split_to_array(r.param_value, ',');
+    END LOOP;
+
+    FOR r IN cur1('all_ping_ports') LOOP
+      l_all_ping_ports := split_to_array(r.param_value, ',');
+
+      dbms_output.put_line(r.param_value);
+    END LOOP;
+
+    l_compressed_ping_ports := to_mapped_indexed_array(l_all_ping_ports);
+
+    l_raw := utl_raw.cast_to_raw('{');
+
+    dbms_lob.writeappend(l_result, utl_raw.length(l_raw), l_raw);
+
+    FOR r IN cur1(NULL) LOOP
+      dbms_output.put_line(r.param_name);
+
+      l_indexed := to_indexed_array(split_to_array(r.param_value, ','),
+                 l_all_hosts);
+
+      l_raw := utl_raw.cast_to_raw('"'
+                                 || r.param_name
+                                 || '":["'
+                                 || nested_table_to_string(l_indexed, ',')
+                                 || '"],');
+
+      dbms_lob.writeappend(l_result, utl_raw.Length(l_raw), l_raw);
+    END LOOP;
+
+    l_raw := utl_raw.cast_to_raw('"all_hosts":['
+                             || nested_table_to_string(l_all_hosts, ',')
+                             || '],');
+
+    dbms_lob.writeappend(l_result, utl_raw.length(l_raw), l_raw);
+
+    l_raw := utl_raw.Cast_to_raw('"all_ping_ports":['
+                             || Nested_table_to_string(
+                                l_compressed_ping_ports,
+                                ',')
+                             || ']');
+
+    dbms_lob.Writeappend(l_result, utl_raw.Length(l_raw), l_raw);
+
+    l_raw := utl_raw.Cast_to_raw('}');
+
+    dbms_lob.writeappend(l_result, utl_raw.Length(l_raw), l_raw);
+
+    dbms_lob.CLOSE(l_result);
+
+    RETURN l_result;
+    
+  END compress_cluster_host_info;
+
+  FUNCTION split_to_array(p_string VARCHAR2,
+                          p_sep    VARCHAR2)
+  RETURN NESTED_VACHAR2_TBL_TYPE
+  AS
+    l_result NESTED_VACHAR2_TBL_TYPE;
+  BEGIN
+      SELECT Regexp_substr(p_string, '[^,]+', 1, LEVEL)
+      BULK   COLLECT INTO l_result
+      FROM   dual
+      CONNECT BY Instr(p_string, p_sep, 1, LEVEL - 1) > 0;
+
+      RETURN l_result;
+  END split_to_array;
+  
+  FUNCTION index_of(p_arr  NESTED_VACHAR2_TBL_TYPE,
+                    p_item VARCHAR2)
+  RETURN INTEGER
+  AS
+  BEGIN
+      FOR i IN p_arr.FIRST..p_arr.LAST LOOP
+          IF p_arr(i) = p_item THEN
+            RETURN i - 1;
+          END IF;
+      END LOOP;
+  END index_of;
+  
+  FUNCTION to_indexed_array(p_arr      NESTED_VACHAR2_TBL_TYPE,
+                            p_dict_arr NESTED_VACHAR2_TBL_TYPE)
+  RETURN NESTED_VACHAR2_TBL_TYPE
+  AS
+    l_index  INTEGER;
+    l_result NESTED_VACHAR2_TBL_TYPE := Nested_vachar2_tbl_type();
+  BEGIN
+      FOR i IN p_arr.first..p_arr.last LOOP
+          l_index := Index_of(p_dict_arr, P_arr(i));
+
+          l_result.Extend(1);
+
+          L_result(i) := l_index;
+      END LOOP;
+
+      RETURN l_result;
+  END to_indexed_array;
+  FUNCTION to_mapped_indexed_array(p_arr NESTED_VACHAR2_TBL_TYPE)
+  RETURN NESTED_VACHAR2_TBL_TYPE
+  AS
+    v_result       NESTED_VACHAR2_TBL_TYPE := Nested_vachar2_tbl_type();
+    v_curr_indexes VARCHAR2(32767);
+    v_prev_val     VARCHAR2(32767);
+  BEGIN
+      FOR i IN p_arr.first..p_arr.last LOOP
+          IF P_arr(i) <> v_prev_val THEN
+            v_result.extend(1);
+
+            V_result(v_result.last) := '"'
+                                       || v_curr_indexes
+                                       || '"';
+
+            v_curr_indexes := NULL;
+          END IF;
+
+          IF v_curr_indexes IS NULL THEN
+            v_curr_indexes := substr(p_arr(i), 2, length(p_arr(i)) - 2)
+                              || ':'
+                              || to_char(i - 1);
+          ELSE
+            v_curr_indexes := v_curr_indexes
+                              || ','
+                              || to_char(i - 1);
+          END IF;
+
+          v_prev_val := p_arr(i);
+      END LOOP;
+
+      IF v_curr_indexes IS NOT NULL THEN
+        v_result.extend(1);
+
+        V_result(v_result.LAST) := '"' || v_curr_indexes || '"';
+      END IF;
+
+      RETURN v_result;
+  END to_mapped_indexed_array;
+  FUNCTION nested_table_to_string(p_arr NESTED_VACHAR2_TBL_TYPE,
+                                  p_sep VARCHAR2)
+  RETURN VARCHAR2
+  AS
+    v_result VARCHAR2(32767);
+  BEGIN
+      v_result := p_arr(1);
+
+      FOR i IN p_arr.FIRST + 1 ..p_arr.LAST LOOP
+          v_result := v_result
+                      || ','
+                      || p_arr(i);
+      END LOOP;
+
+      RETURN v_result;
+  END nested_table_to_string;
+END compress_cluster_host_info_pkg; 
+/
+
+--Compress cluster host info
+UPDATE stage s
+SET s.cluster_host_info = compress_cluster_host_info_pkg.compress_cluster_host_info(s.cluster_host_info)
+WHERE dbms_lob.instr(cluster_host_info, utl_raw.cast_to_raw('ambari_db_rca'), 1, 1) > 0;
+
+--Drop compression package
+DROP PACKAGE compress_cluster_host_info_pkg;
+
+
 ALTER TABLE hosts DROP COLUMN disks_info;
 
 --Added end_time and structured output support to command execution result

+ 179 - 0
ambari-server/src/main/resources/upgrade/ddl/Ambari-DDL-Postgres-UPGRADE-1.3.0.sql

@@ -150,9 +150,188 @@ UPDATE ambari.stage sd
   AND hrc.request_id = ss.request_id
   AND sd.cluster_host_info IS NULL;
   
+
 --Set cluster_host_info column mandatory
 ALTER TABLE ambari.stage ALTER COLUMN cluster_host_info SET NOT NULL;
 
+--Compress cluster host info-----------------------------
+CREATE OR REPLACE FUNCTION get_keys(p_cluster_host_info text)
+  RETURNS setof text AS
+$_$
+DECLARE
+v_r text;
+BEGIN
+  FOR v_r IN (SELECT substr(key_tokens,3,length(key_tokens)) AS cluster_host_info_key
+  FROM regexp_split_to_table(p_cluster_host_info, E'":\[[a-z0-9":.,-]{1,}\]') AS key_tokens
+   WHERE key_tokens NOT LIKE '%ambari_db_rca_%') LOOP
+     RETURN NEXT v_r;
+  END LOOP;
+END;
+$_$ LANGUAGE plpgsql;
+
+
+
+CREATE OR REPLACE FUNCTION get_value(p_cluster_host_info text, p_param_key text)
+  RETURNS text AS
+$_$
+
+DECLARE
+v_param_value text;
+BEGIN
+
+	SELECT regexp_matches(p_cluster_host_info,
+
+	 '"' || p_param_key || E'":\[["a-z0-9., ]{1,}]') into v_param_value;
+
+	SELECT substring(v_param_value, length(p_param_key) + 9, length(v_param_value) - length(p_param_key) - 11) into v_param_value;
+
+	RETURN v_param_value;
+	
+END;
+$_$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION compress_cluster_host_info(p_stage_id ambari.stage.stage_id%type, p_request_id ambari.stage.request_id%type) RETURNS text AS
+$_$
+DECLARE
+
+cur1 CURSOR(p_param_name text) IS 
+
+  select a.param_key, string_to_array(get_value(cast(cluster_host_info as text), a.param_key), ',') as param_value
+  from (
+    select s.stage_id, request_id, get_keys(cast(cluster_host_info as text)) as param_key, s.cluster_host_info from ambari.stage s) a
+      where stage_id = p_stage_id
+      and request_id = p_request_id
+      and (a.param_key = p_param_name or (p_param_name is null and a.param_key not in ('all_hosts', 'all_ping_ports')));
+
+v_all_hosts text[];
+v_all_ping_ports text[];
+v_indexed integer[];
+v_r record;
+v_param_key text;
+v_compressed_ping_ports text[];
+v_compressed_cluster_host_info text;
+
+BEGIN
+
+  open cur1('all_hosts');
+  fetch cur1 into v_param_key, v_all_hosts;
+  close cur1;
+
+  open cur1('all_ping_ports');
+  fetch cur1 into v_param_key, v_all_ping_ports;
+  close cur1;
+
+  v_compressed_cluster_host_info = '{';
+
+  for v_r in cur1(null) loop
+    v_indexed = to_indexed_array(v_r.param_value, v_all_hosts);
+    select v_compressed_cluster_host_info || '"' || v_r.param_key || '":["' || array_to_string(v_indexed, ',') || '"],'
+    into v_compressed_cluster_host_info;
+
+  end loop;
+
+  v_compressed_ping_ports = to_mapped_indexed_array(v_all_ping_ports);
+
+  v_compressed_cluster_host_info = v_compressed_cluster_host_info || '"all_hosts":["' || array_to_string(v_all_hosts, ',') || '"],';
+
+  v_compressed_cluster_host_info = v_compressed_cluster_host_info || '"all_ping_ports":["' || array_to_string(v_compressed_ping_ports, ',') || '"]';
+
+  v_compressed_cluster_host_info = v_compressed_cluster_host_info || '}';
+
+  return v_compressed_cluster_host_info;
+
+  
+END;
+$_$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION index_of(p_arr text[], p_item text)
+RETURNS INT AS 
+$_$
+DECLARE
+  v_index integer;
+BEGIN
+
+    SELECT i-1
+    into v_index
+      FROM generate_subscripts(p_arr, 1) AS i
+     WHERE p_arr[i] = p_item
+  ORDER BY i;
+
+  RETURN v_index;
+END;
+$_$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION to_indexed_array(arr text[], dict_array text[])
+RETURNS integer[] AS 
+$_$
+
+DECLARE
+
+v_result integer[];
+v_index_of integer;
+
+BEGIN
+
+  FOR i IN array_lower(arr, 1)..array_upper(arr, 1)
+    LOOP
+        v_index_of = index_of(dict_array, arr[i]);
+        select array_append(v_result, v_index_of) into v_result;
+    END LOOP;
+
+  RETURN v_result; 
+    
+END;
+$_$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION to_mapped_indexed_array(p_arr text[])
+RETURNS text[] AS 
+$_$
+DECLARE
+v_result text[];
+v_r record;
+v_curr_indexes text;
+v_prev_val text;
+BEGIN
+
+  FOR v_r in (select (row_number() OVER (ORDER BY 1)) -1 AS ind, x AS val from (select unnest(p_arr) AS x) a) LOOP
+
+    if v_r.val <> v_prev_val then
+      v_result = array_append(v_result, v_curr_indexes);
+      v_curr_indexes = null;
+    end if;
+
+    if v_curr_indexes is null then
+      v_curr_indexes = v_r.val || ':' || v_r.ind;
+    else
+      v_curr_indexes = v_curr_indexes || ',' || v_r.ind;
+    end if;
+
+    v_prev_val = v_r.val;
+    
+  END LOOP;
+
+  if v_curr_indexes is not null then
+    v_result = array_append(v_result, v_curr_indexes);
+  end if;
+  
+  RETURN v_result; 
+    
+END;
+$_$ LANGUAGE plpgsql;
+
+--Update cluster host info to compressed values
+UPDATE ambari.stage s
+SET cluster_host_info = (decode(replace(compress_cluster_host_info(stage_id, request_id), E'\\', E'\\\\'), 'escape'))
+WHERE s.cluster_host_info LIKE '%ambari_db_rca%';
+
+--Drop compression functions
+DROP FUNCTION get_keys;
+DROP FUNCTION get_value;
+DROP FUNCTION compress_cluster_host_info;
+DROP FUNCTION to_indexed_array;
+DROP FUNCTION to_mapped_indexed_array;
+
+
 ALTER TABLE ambari.hosts DROP COLUMN disks_info;
 
 --Added end_time and structured output support to command execution result

+ 7 - 5
ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java

@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.google.common.reflect.TypeToken;
 import com.google.inject.persist.UnitOfWork;
@@ -78,7 +79,7 @@ public class TestActionScheduler {
   @Test
   public void testActionSchedule() throws Exception {
     
-    Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
+    Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
     Map<String, List<String>> clusterHostInfo = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
     
     ActionQueue aq = new ActionQueue();
@@ -631,11 +632,11 @@ public class TestActionScheduler {
   @Test
   public void testClusterHostInfoCache() throws Exception {
     
-    Type type = new TypeToken<Map<String, List<String>>>() {}.getType();
+    Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
     
     //Data for stages
-    Map<String, List<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
-    Map<String, List<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
+    Map<String, Set<String>> clusterHostInfo1 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO, type);
+    Map<String, Set<String>> clusterHostInfo2 = StageUtils.getGson().fromJson(CLUSTER_HOST_INFO_UPDATED, type);
     int stageId = 1;
     int requestId1 = 1;
     int requestId2 = 2;
@@ -676,8 +677,9 @@ public class TestActionScheduler {
 
     assertTrue(ac.get(0) instanceof ExecutionCommand);
     assertEquals(String.valueOf(requestId1) + "-" + stageId, ((ExecutionCommand) (ac.get(0))).getCommandId());
+    
     assertEquals(clusterHostInfo1, ((ExecutionCommand) (ac.get(0))).getClusterHostInfo());
-
+    
 
     when(db.getStagesInProgress()).thenReturn(Collections.singletonList(s2));
     

+ 264 - 113
ambari-server/src/test/java/org/apache/ambari/server/utils/TestStageUtils.java

@@ -18,15 +18,21 @@
 package org.apache.ambari.server.utils;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 import javax.xml.bind.JAXBException;
 
@@ -41,6 +47,7 @@ import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.ServiceComponentHostFactory;
 import org.apache.ambari.server.state.StackId;
 import org.apache.commons.logging.Log;
@@ -50,17 +57,23 @@ import org.codehaus.jackson.map.JsonMappingException;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ContiguousSet;
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Range;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
 public class TestStageUtils {
+  private static final String HOSTS_LIST = "all_hosts";
+
+  private static final String STACK_ID = "HDP-1.3.1";
+
   private static Log LOG = LogFactory.getLog(TestStageUtils.class);
 
   private AmbariMetaInfo ambariMetaInfo;
 
   private Injector injector;
 
-//  ServiceComponentFactory serviceComponentFactory;
   static ServiceComponentHostFactory serviceComponentHostFactory;
 
   @Before
@@ -74,71 +87,24 @@ public class TestStageUtils {
   }
 
 
-  public static void addHdfsService(Cluster cl, String [] hostList,
-      Injector injector) throws AmbariException {
-    cl.setDesiredStackVersion(new StackId("HDP-0.1"));
-    cl.addService("HDFS");
-    cl.getService("HDFS").addServiceComponent("NAMENODE");
-    cl.getService("HDFS").addServiceComponent("DATANODE");
-    cl.getService("HDFS").addServiceComponent("SECONDARY_NAMENODE");
-    cl.getService("HDFS")
-        .getServiceComponent("NAMENODE")
-        .addServiceComponentHost(
-            serviceComponentHostFactory.createNew(cl.getService("HDFS")
-                .getServiceComponent("NAMENODE"), hostList[0], false));
-    cl.getService("HDFS")
-        .getServiceComponent("SECONDARY_NAMENODE")
-        .addServiceComponentHost(
-            serviceComponentHostFactory.createNew(cl.getService("HDFS")
-                .getServiceComponent("SECONDARY_NAMENODE"), hostList[1], false)
-        );
-    for (int i = 1; i < hostList.length; i++) {
-      cl.getService("HDFS")
-          .getServiceComponent("DATANODE")
-          .addServiceComponentHost(serviceComponentHostFactory.createNew(cl.getService("HDFS")
-              .getServiceComponent("DATANODE"), hostList[i], false));
-    }
-  }
-
-  public static void addHbaseService(Cluster cl, String [] hostList,
-      Injector injector) throws AmbariException {
-    cl.setDesiredStackVersion(new StackId("HDP-0.2"));
-    cl.addService("HBASE");
-    cl.getService("HBASE").addServiceComponent("HBASE_MASTER");
-    cl.getService("HBASE").addServiceComponent("HBASE_REGIONSERVER");
-    cl.getService("HBASE")
-        .getServiceComponent("HBASE_MASTER")
-        .addServiceComponentHost(
-            serviceComponentHostFactory.createNew(cl.getService("HBASE")
-                .getServiceComponent("HBASE_MASTER"), hostList[0], false));
-    for (int i = 1; i < hostList.length; i++) {
-      cl.getService("HBASE")
-          .getServiceComponent("HBASE_REGIONSERVER")
-          .addServiceComponentHost(
-              serviceComponentHostFactory.createNew(cl.getService("HBASE")
-                  .getServiceComponent("HBASE_REGIONSERVER"), hostList[i],
-                  false));
-    }
-  }
-
-  public static void addMapreduceService(Cluster cl, String [] hostList,
-                                     Injector injector) throws AmbariException {
-    cl.setDesiredStackVersion(new StackId("HDP-0.2"));
-    cl.addService("MAPREDUCE");
-    cl.getService("MAPREDUCE").addServiceComponent("JOBTRACKER");
-    cl.getService("MAPREDUCE").addServiceComponent("TASKTRACKER");
-    cl.getService("MAPREDUCE")
-        .getServiceComponent("JOBTRACKER")
+  public static void addService(Cluster cl, List<String> hostList,
+       Map<String, List<Integer>> topology, String serviceName,
+       Injector injector) throws AmbariException {
+    cl.setDesiredStackVersion(new StackId(STACK_ID));
+    cl.addService(serviceName);
+    
+    for (Entry<String, List<Integer>> component : topology.entrySet()) {
+      
+      String componentName = component.getKey();
+      cl.getService(serviceName).addServiceComponent(componentName);
+      
+      for (Integer hostIndex : component.getValue()) {
+        cl.getService(serviceName)
+        .getServiceComponent(componentName)
         .addServiceComponentHost(
-            serviceComponentHostFactory.createNew(cl.getService("MAPREDUCE")
-                .getServiceComponent("JOBTRACKER"), hostList[0], false));
-    for (int i = 1; i < hostList.length; i++) {
-      cl.getService("MAPREDUCE")
-          .getServiceComponent("TASKTRACKER")
-          .addServiceComponentHost(
-              serviceComponentHostFactory.createNew(cl.getService("MAPREDUCE")
-                  .getServiceComponent("TASKTRACKER"), hostList[i],
-                  false));
+            serviceComponentHostFactory.createNew(cl.getService(serviceName)
+                .getServiceComponent(componentName), hostList.get(hostIndex), false));
+      }
     }
   }
 
@@ -170,11 +136,11 @@ public class TestStageUtils {
       JsonMappingException, JAXBException, IOException {
     Stage s = StageUtils.getATestStage(1, 2, "host1", "clusterHostInfo");
     ExecutionCommand cmd = s.getExecutionCommands("host1").get(0).getExecutionCommand();    
-    cmd.setConfigurationTags(new HashMap<String, Map<String,String>>() {{
-      put("global", new HashMap<String, String>() {{ put("tag", "version1"); }});
-    }});
-    
-    
+    HashMap<String, Map<String,String>> configTags = new HashMap<String, Map<String,String>>();
+    Map<String, String> globalTag = new HashMap<String, String>();
+    globalTag.put("tag", "version1");
+    configTags.put("global", globalTag );
+    cmd.setConfigurationTags(configTags);
     String json = StageUtils.jaxbToString(cmd);
     ExecutionCommand cmdDes = StageUtils.stringToExecutionCommand(json);
     assertEquals(cmd.toString(), cmdDes.toString());
@@ -184,50 +150,235 @@ public class TestStageUtils {
   @Test
   public void testGetClusterHostInfo() throws AmbariException, UnknownHostException {
     Clusters fsm = injector.getInstance(Clusters.class);
+    
+    List<String> hostList = new ArrayList<String>();
+    hostList.add("h1");
+    hostList.add("h2");
+    hostList.add("h3");
+    hostList.add("h4");
+    hostList.add("h5");
+    hostList.add("h6");
+    hostList.add("h7");
+    hostList.add("h8");
+    hostList.add("h9");
+    hostList.add("h10");
+    
+    List<Integer> pingPorts = Arrays.asList(StageUtils.DEFAULT_PING_PORT,
+        StageUtils.DEFAULT_PING_PORT,
+        StageUtils.DEFAULT_PING_PORT,
+        8671,
+        8671,
+        null,
+        8672,
+        8672,
+        null,
+        8673);
+    
     fsm.addCluster("c1");
-    fsm.addHost("h1");
-    fsm.addHost("h2");
-    fsm.addHost("h3");
-    fsm.addHost("h4");
-    fsm.getCluster("c1").setDesiredStackVersion(new StackId("HDP-0.1"));
-    fsm.getHost("h1").setOsType("centos5");
-    fsm.getHost("h2").setOsType("centos5");
-    fsm.getHost("h3").setOsType("centos5");
-    fsm.getHost("h4").setOsType("centos5");
-    fsm.getHost("h1").setCurrentPingPort(8670);
-    fsm.getHost("h2").setCurrentPingPort(null);
-    fsm.getHost("h3").setCurrentPingPort(null);
-    fsm.getHost("h4").setCurrentPingPort(8670);
-    fsm.getHost("h1").persist();
-    fsm.getHost("h2").persist();
-    fsm.getHost("h3").persist();
-    fsm.getHost("h4").persist();
-    fsm.mapHostToCluster("h1", "c1");
-    fsm.mapHostToCluster("h2", "c1");
-    fsm.mapHostToCluster("h3", "c1");
-    fsm.mapHostToCluster("h4", "c1");
-    String [] hostList = {"h1", "h2", "h3" };
-    addHdfsService(fsm.getCluster("c1"), hostList, injector);
-    addHbaseService(fsm.getCluster("c1"), hostList, injector);
-    addMapreduceService(fsm.getCluster("c1"), hostList, injector);
-    Map<String, List<String>> info = StageUtils.getClusterHostInfo(fsm.getHostsForCluster("c1"),
+    fsm.getCluster("c1").setDesiredStackVersion(new StackId(STACK_ID));
+    
+    int index = 0;
+    
+    for (String host: hostList) {
+      fsm.addHost(host);
+      fsm.getHost(host).setOsType("centos5");
+      fsm.getHost(host).setCurrentPingPort(pingPorts.get(index));
+      fsm.getHost(host).persist();
+      fsm.mapHostToCluster(host, "c1");
+      index++;
+    }
+
+    //Add HDFS service
+    Map<String, List<Integer>> hdfsTopology = new HashMap<String, List<Integer>>();
+    hdfsTopology.put("NAMENODE", Collections.singletonList(0));
+    hdfsTopology.put("SECONDARY_NAMENODE", Collections.singletonList(1));
+    List<Integer> datanodeIndexes = Arrays.asList(0,1,2,3,5,7,8,9);
+    hdfsTopology.put("DATANODE", new ArrayList<Integer>(datanodeIndexes));
+    addService(fsm.getCluster("c1"), hostList, hdfsTopology , "HDFS", injector);
+    
+    //Add HBASE service
+    Map<String, List<Integer>> hbaseTopology = new HashMap<String, List<Integer>>(); 
+    hbaseTopology.put("HBASE_MASTER", Collections.singletonList(5));
+    List<Integer> regionServiceIndexes = Arrays.asList(1,3,5,8,9);;
+    hbaseTopology.put("HBASE_REGIONSERVER", regionServiceIndexes);
+    addService(fsm.getCluster("c1"), hostList, hbaseTopology , "HBASE", injector);
+    
+    //Add MAPREDUCE service
+    Map<String, List<Integer>> mrTopology = new HashMap<String, List<Integer>>(); 
+    mrTopology.put("JOBTRACKER", Collections.singletonList(5));
+    List<Integer> taskTrackerIndexes = Arrays.asList(1,2,3,4,5,7,9);;
+    mrTopology.put("TASKTRACKER", taskTrackerIndexes);
+    addService(fsm.getCluster("c1"), hostList, mrTopology , "MAPREDUCE", injector);
+    
+    //Get cluster host info
+    Map<String, Set<String>> info = StageUtils.getClusterHostInfo(fsm.getHostsForCluster("c1"),
         fsm.getCluster("c1"), new HostsMap(injector.getInstance(Configuration.class)),
         injector.getInstance(Configuration.class));
-    assertEquals(2, info.get("slave_hosts").size());
-    assertEquals(2, info.get("mapred_tt_hosts").size());
-    assertEquals(2, info.get("hbase_rs_hosts").size());
-    assertEquals(1, info.get("hbase_master_hosts").size());
-    assertEquals(4, info.get("all_hosts").size());
-    assertEquals(4, info.get("all_ping_ports").size());
-    assertEquals("h1", info.get("hbase_master_hosts").get(0));
-    assertEquals("8670", info.get("all_ping_ports").get(0));
-    assertEquals("8670", info.get("all_ping_ports").get(1));
-    assertEquals("8670", info.get("all_ping_ports").get(2));
-    assertEquals("8670", info.get("all_ping_ports").get(3));
-
-    assertFalse(info.get("ambari_db_rca_url").get(0).contains(Configuration.HOSTNAME_MACRO));
-    String address = InetAddress.getLocalHost().getCanonicalHostName();
-    assertTrue(info.get("ambari_db_rca_url").get(0).contains(address));
 
+    //All hosts present in cluster host info
+    assertEquals(fsm.getHosts().size(), info.get(HOSTS_LIST).size());
+    for (Host host: fsm.getHosts()) {
+      assertTrue(info.get(HOSTS_LIST).contains(host.getHostName()));
+    }
+    
+    
+    //Check HDFS topology compression
+    Map<String, String> hdfsMapping = new HashMap<String, String>();
+    hdfsMapping.put("DATANODE", "slave_hosts");
+    hdfsMapping.put("NAMENODE", "namenode_host");
+    hdfsMapping.put("SECONDARY_NAMENODE", "snamenode_host");
+    checkServiceCompression(info, hdfsMapping, hdfsTopology, hostList);
+    
+    
+    //Check HBASE topology compression
+    Map<String, String> hbaseMapping = new HashMap<String, String>();
+    hbaseMapping.put("HBASE_MASTER", "hbase_master_hosts");
+    hbaseMapping.put("HBASE_REGIONSERVER", "hbase_rs_hosts");
+    checkServiceCompression(info, hbaseMapping, hbaseTopology, hostList);
+    
+    //Check MAPREDUCE topology compression
+    Map<String, String> mrMapping = new HashMap<String, String>();
+    mrMapping.put("JOBTRACKER", "jtnode_host");
+    mrMapping.put("TASKTRACKER", "mapred_tt_hosts");
+    checkServiceCompression(info, mrMapping, mrTopology, hostList);
+    
+    Set<String> actualPingPorts = info.get("all_ping_ports");
+    
+    
+    if (pingPorts.contains(null))
+      assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size() + 1);
+    else
+      assertEquals(new HashSet<Integer>(pingPorts).size(), actualPingPorts.size());
+    
+    List<Integer> pingPortsActual = getRangeMappedDecompressedSet(actualPingPorts);
+    
+    
+    List<Integer> reindexedPorts = getReindexedList(pingPortsActual, new ArrayList<String>(info.get(HOSTS_LIST)), hostList);
+    
+    //Treat null values
+    while (pingPorts.contains(null)) {
+      int indexOfNull = pingPorts.indexOf(null);
+      pingPorts.set(indexOfNull, StageUtils.DEFAULT_PING_PORT);
+    }
+
+    assertEquals(pingPorts, reindexedPorts);
+
+  }
+
+  private void checkServiceCompression(Map<String, Set<String>> info,
+      Map<String, String> serviceMapping, Map<String, List<Integer>> serviceTopology,
+      List<String> hostList) {
+    
+    
+    for (Entry<String, List<Integer>> component: serviceTopology.entrySet()) {
+      
+      String componentName = component.getKey();
+      
+      List<Integer> componentIndexesExpected = component.getValue();
+      
+      String roleName = serviceMapping.get(componentName);
+      
+      assertTrue("No mapping for " + componentName , roleName != null);
+      
+      Set<Integer> componentIndexesActual = getDecompressedSet(info.get(roleName));
+      
+      Set<String> expectedComponentHosts = new HashSet<String>();
+      
+      for (Integer i: componentIndexesExpected)
+        expectedComponentHosts.add(hostList.get(i));
+      
+      Set<String> actualSlavesHosts = new HashSet<String>();
+      
+      for (Integer i: componentIndexesActual)
+        actualSlavesHosts.add(new ArrayList<String>(info.get(HOSTS_LIST)).get(i));
+        
+      
+      
+      assertEquals(expectedComponentHosts, actualSlavesHosts);
+    
+    }
+    
   }
+
+  private Set<Integer> getDecompressedSet(Set<String> set) {
+
+    Set<Integer> resultSet = new HashSet<Integer>();
+
+    for (String index : set) {
+
+      String[] ranges = index.split(",");
+
+      for (String r : ranges) {
+
+        String[] split = r.split("-");
+
+        if (split.length == 2) {
+          Integer start = Integer.valueOf(split[0]);
+          Integer end = Integer.valueOf(split[1]);
+          ContiguousSet<Integer> rangeSet =
+          ContiguousSet.create(Range.closed(start, end), DiscreteDomain.integers()) ;
+
+          for (Integer i : rangeSet) {
+            resultSet.add(i);
+
+          }
+
+        } else {
+          resultSet.add(Integer.valueOf(split[0]));
+        }
+      }
+
+    }
+    return resultSet;
+  }
+  
+  private List<Integer> getRangeMappedDecompressedSet(Set<String> compressedSet) {
+
+    SortedMap<Integer, Integer> resultMap = new TreeMap<Integer, Integer>();
+
+    for (String token : compressedSet) {
+
+      String[] split = token.split(":");
+
+      if (split.length != 2)
+        throw new RuntimeException("Broken data, expected format - m:r, got - "
+            + token);
+
+      Integer index = Integer.valueOf(split[0]);
+
+      String rangeTokens = split[1];
+
+      Set<String> rangeTokensSet =
+          new HashSet<String>(Arrays.asList(rangeTokens.split(",")));
+
+      Set<Integer> decompressedSet = getDecompressedSet(rangeTokensSet);
+
+      for (Integer i : decompressedSet)
+        resultMap.put(i, index);
+
+    }
+
+    List<Integer> resultList = new ArrayList<Integer>(resultMap.values());
+
+    return resultList;
+
+  }
+  
+  private List<Integer> getReindexedList(List<Integer> list,
+      List<String> currentIndexes, List<String> desiredIndexes) {
+
+    SortedMap<Integer, Integer> sortedMap = new TreeMap<Integer, Integer>();
+
+    int index = 0;
+
+    for (Integer value : list) {
+      String currentIndexValue = currentIndexes.get(index);
+      Integer desiredIndexValue = desiredIndexes.indexOf(currentIndexValue);
+      sortedMap.put(desiredIndexValue, value);
+      index++;
+    }
+
+    return new ArrayList<Integer>(sortedMap.values());
+  }
+
 }